In [0]:
# IMPORTS ON START OF LINE
# IMPORT DATATYPES

# import spark at start of line & create a spark variable
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import *
spark = SparkSession.builder.getOrCreate()

# import datatypes
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [0]:
# LOADING CSV/JSON & OTHER TYPES OF FILES + CREATING DF ON THE LOADED FILES
# OPTIONS OF LOADING THE FILES 

# Import csv
df1=spark.read.option('header', 'true').csv('/FileStore/tables/employees_new.csv',inferSchema=True)
df1.show()

df2=spark.read.option('header', 'true').csv('/FileStore/tables/departments.csv',inferSchema=True)
df2.show()

# Import json
# df1 = spark.read.load("/FileStore/tables/employees.json")
# df1 = spark.read.load("/FileStore/tables/employees.json", format="json")

# Import parquet
# df1 = spark.read.load("/FileStore/tables/employees.parquet")
# df1 = spark.read.load("/FileStore/tables/employees.parquet", format="parquet")


+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|          null|       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|          null|       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|          null|       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|          null|       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|          null|       201|           20|


In [0]:
# CREATING DATAFRAMES MANUALLY

# Create dataframes manually
data = [('James', 'Sales', 3000),
	    ('Michael', 'Sales', 4600),
	    ('Robert', 'Sales', 4100)]
columns = ["Employee", "Department", "Salary"]
df_manual_create = spark.createDataFrame(data, columns)
df_manual_create.show()

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
|   James|     Sales|  3000|
| Michael|     Sales|  4600|
|  Robert|     Sales|  4100|
+--------+----------+------+



In [0]:
# INSPECTING DATA
# CHECKING DF TYPES
# USING SHOW
# USING PRINTSCHEMA

# Return dataframe's column names and dataypes
df1.dtypes

# Display the content of df1
df1.show()

# Return first n rows
df1.head(10)

# Return first row
df1.first()

# Return the first n rows df1.schema
df1.take(2)

# Compute summary statistics df1.columns Return the columns of df1
df1.describe().show()

# Count the number of rows in df1
df1.count()

# Count the number of distinct rows in df1
df1.distinct().count()

# Print the schema of df1
df1.printSchema()

# Print the (logical and physical) plans
df1.explain()


+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|          null|       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|          null|       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|          null|       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|          null|       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|          null|       201|           20|


In [0]:
# SELECTING COLUMNS - SELECTING ALL & ONLY SELECT COLUMNS

df1.select('*').show()
df1.select('EMPLOYEE_ID', 'FIRST_NAME').show()
df1.select(df1.EMPLOYEE_ID, df1.FIRST_NAME).show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|          null|       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|          null|       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|          null|       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|          null|       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|          null|       201|           20|


In [0]:
# ADDING COLUMNS OF DF
# RENAMING COLUMNS OF DF
# DROPPING COLUMNS OF DF

# Adding columns - filled with null values
df_add_column1 = df1.withColumn('STATUS', lit(None))
df_add_column1.show()
 
# Adding columns - filled with derived values, use concat_ws to separate the column values, the first parameter is the spacer, in this case, an empty space
df_add_column2 = df1.withColumn('FULL_NAME', concat_ws(' ', df1.FIRST_NAME, df1.LAST_NAME))
df_add_column2.show()

# Renaming columns
df_rename_column1 = df1.withColumnRenamed('FIRST_NAME', 'FIRST_NAME_REN')
df_rename_column1.show()
 
# Removing columns
df_drop_column1 = df1.drop('FIRST_NAME', 'LAST_NAME')
df_drop_column1.show()
df_drop_column2 = df1.drop(df1.FIRST_NAME).drop(df1.LAST_NAME)
df_drop_column2.show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|STATUS|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|          null|       124|           50|  null|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|          null|       124|           50|  null|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|          null|       101|           10|  null|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|          null|       100|           20|  null|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP

In [0]:
# HANDLING NULL VAULES
    # - DROP ROWS WITH NULL VALUES WHEN ALL VALUES OF THE ROWS IS NULL
    # - DROP ROWS WITH NULL VALUES WHEN SOME VALUES OF THE ROWS IS NULL & LIMIT IT WITH THRESH()
    # - FILLING NULL VALUES WITH .FILL()
    
# Drop rows with any null value in the row
df_drop_row1 = df1.na.drop(how='any')
df_drop_row1.show()

# Drop rows where all of the values of the row is null
df_drop_row2 = df1.na.drop(how='all')
df_drop_row2.show()

# Drop rows where 2 or more of the values on the row is non-null
df_drop_row3 = df1.na.drop(how='any',thresh=2)
df_drop_row3.show()

# Drop rows if there are any null values on the rows only on the specified column using subset
df_drop_row4 = df1.na.drop(how='any', subset='COMMISSION_PCT')
df_drop_row4.show()

# Filling any null values with a specified value
df_fill_null_values1 = df1.na.fill('Missing Values')
df_fill_null_values1.show()

# Filling null values on specific columns with a value
df_fill_null_values2 = df1.na.fill({'COMMISSION_PCT': 'A', 'MANAGER_ID': 999})
df_fill_null_values2.show()

+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|EMAIL|PHONE_NUMBER|HIRE_DATE|JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+-------------+
+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+-------------+

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|          null|       124|           50|
|        199|   Douglas|   

In [0]:
# FILTER OPERATIONS
# USING &, |, ==
# USING ~
# USING LIKE
# USING BETWEEN
# USING SUBSTRING

# Filter with Select
df1.filter(df1.SALARY>=5000).select('EMPLOYEE_ID', 'FIRST_NAME', 'SALARY').show()

# Filter with 'AND'
df1.filter((df1.DEPARTMENT_ID == 50) & (df1.MANAGER_ID  == 124)).select('EMPLOYEE_ID', 'FIRST_NAME', 'SALARY', 'DEPARTMENT_ID', 'MANAGER_ID').show()

# Filter with 'OR'
df1.filter((df1.DEPARTMENT_ID == 50) | (df1.MANAGER_ID  == 124)).select('EMPLOYEE_ID', 'FIRST_NAME', 'SALARY', 'DEPARTMENT_ID', 'MANAGER_ID').show()

# Filtering with 'Like'
df1.filter(df1.FIRST_NAME.like('Do%')).show()
 
# Filtering with 'Between'
df1.select("FIRST_NAME","SALARY").where(df1.SALARY.between(3000, 5000)).show()

# Filtering with 'Substring'
df1.select(substring(df1.FIRST_NAME, 1, 2)).show()

+-----------+-----------+------+
|EMPLOYEE_ID| FIRST_NAME|SALARY|
+-----------+-----------+------+
|        201|    Michael| 13000|
|        202|        Pat|  6000|
|        203|      Susan|  6500|
|        204|    Hermann| 10000|
|        205|    Shelley| 12008|
|        206|    William|  8300|
|        100|     Steven| 24000|
|        101|      Neena| 17000|
|        102|        Lex| 17000|
|        103|  Alexander|  9000|
|        104|      Bruce|  6000|
|        108|      Nancy| 12008|
|        109|     Daniel|  9000|
|        110|       John|  8200|
|        111|     Ismael|  7700|
|        112|Jose Manuel|  7800|
|        113|       Luis|  6900|
|        114|        Den| 11000|
|        120|    Matthew|  8000|
|        121|       Adam|  8200|
+-----------+-----------+------+
only showing top 20 rows

+-----------+----------+------+-------------+----------+
|EMPLOYEE_ID|FIRST_NAME|SALARY|DEPARTMENT_ID|MANAGER_ID|
+-----------+----------+------+-------------+----------+
|        19

In [0]:
# GROUP BY & AGGREGATES

# Group by & aggregate
df1.groupBy(df1.FIRST_NAME).count().show()
df1.groupBy(df1.FIRST_NAME).avg().show()
df1.groupBy(df1.FIRST_NAME).max().show()
  
# Group by with select & filter & order by
df1.groupBy('DEPARTMENT_ID').agg(sum('SALARY').alias("TOTAL_SALARY_DEPT")).filter(df1.DEPARTMENT_ID != 50).sort(df1.DEPARTMENT_ID.desc()).show() 

+-----------+-----+
| FIRST_NAME|count|
+-----------+-----+
|      Neena|    1|
|     Shelli|    1|
|      James|    2|
|      Jason|    1|
|    Shelley|    1|
|    Matthew|    1|
|   Jennifer|    1|
|Jose Manuel|    1|
|      Diana|    1|
|     Ismael|    1|
|      Mozhe|    1|
|    Hermann|    1|
|    Michael|    2|
|      Susan|    1|
|        Pat|    1|
|        Guy|    1|
|      Julia|    1|
|      Irene|    1|
|     Joshua|    1|
|     Donald|    1|
+-----------+-----+
only showing top 20 rows

+-----------+----------------+-----------+---------------+------------------+
| FIRST_NAME|avg(EMPLOYEE_ID)|avg(SALARY)|avg(MANAGER_ID)|avg(DEPARTMENT_ID)|
+-----------+----------------+-----------+---------------+------------------+
|      Neena|           101.0|    17000.0|          100.0|              90.0|
|     Shelli|           116.0|     2900.0|          114.0|              30.0|
|      James|           129.0|     2450.0|          120.5|              50.0|
|      Jason|           13

In [0]:
# JOINS

# Inner join
df_inner_join1 = df1.join(df2, df1.DEPARTMENT_ID == df2.DEPARTMENT_ID, 'inner').sort(df1.FIRST_NAME.asc())
df_inner_join1.show()

# Inner join with select, filter, order by
df_inner_join2 = df1.join(df2, df1.DEPARTMENT_ID == df2.DEPARTMENT_ID, 'inner').filter(df1.DEPARTMENT_ID == 50).select(df1.EMPLOYEE_ID, df1.FIRST_NAME, df2.DEPARTMENT_ID).sort(df1.FIRST_NAME.asc())
df_inner_join2.show()

# Left join, the df1 is the 'left' table here
df_left_join1 = df1.join(df2, df1.DEPARTMENT_ID == df2.DEPARTMENT_ID, 'left').sort(df1.FIRST_NAME.asc())
df_left_join1.show()

# Right join, the df2 is the 'right' table here
df_right_join1 = df1.join(df2, df1.DEPARTMENT_ID == df2.DEPARTMENT_ID, 'right').sort(df1.FIRST_NAME.asc())
df_right_join1.show()

# full join
df_full_join1 = df1.join(df2, df1.DEPARTMENT_ID == df2.DEPARTMENT_ID, 'full').sort(df1.FIRST_NAME.asc())
df_full_join1.show()

+-----------+----------+-----------+--------+------------+---------+----------+------+--------------+----------+-------------+-------------+----------------+----------+-----------+
|EMPLOYEE_ID|FIRST_NAME|  LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|DEPARTMENT_ID| DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-----------+----------+-----------+--------+------------+---------+----------+------+--------------+----------+-------------+-------------+----------------+----------+-----------+
|        121|      Adam|      Fripp|  AFRIPP|650.123.2234|10-APR-05|    ST_MAN|  8200|          null|       100|           50|           50|        Shipping|       121|       1500|
|        103| Alexander|     Hunold| AHUNOLD|590.423.4567|03-JAN-06|   IT_PROG|  9000|          null|       102|           60|           60|              IT|       103|       1400|
|        115| Alexander|       Khoo|   AKHOO|515.127.4562|18-MAY-03|  PU_CLERK|  3100|         

In [0]:
# ORDER BY
df1.filter(df1.DEPARTMENT_ID == 50).select('EMPLOYEE_ID', 'FIRST_NAME', 'DEPARTMENT_ID').sort('EMPLOYEE_ID', asc = True).show()

+-----------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|DEPARTMENT_ID|
+-----------+----------+-------------+
|        120|   Matthew|           50|
|        121|      Adam|           50|
|        122|     Payam|           50|
|        123|    Shanta|           50|
|        124|     Kevin|           50|
|        125|     Julia|           50|
|        126|     Irene|           50|
|        127|     James|           50|
|        128|    Steven|           50|
|        129|     Laura|           50|
|        130|     Mozhe|           50|
|        131|     James|           50|
|        132|        TJ|           50|
|        133|     Jason|           50|
|        134|   Michael|           50|
|        135|        Ki|           50|
|        136|     Hazel|           50|
|        137|    Renske|           50|
|        138|   Stephen|           50|
|        139|      John|           50|
+-----------+----------+-------------+
only showing top 20 rows



In [0]:
# INSERT ROWS

# Create columns identical to the table to be inserted
columns = ['EMPLOYEE_ID', 'FIRST_NAME', 'LAST_NAME', 'EMAIL', 'PHONE_NUMBER', 'HIRE_DATE', 'JOB_ID', 'SALARY', 'COMMISSION_PCT', 'MANAGER_ID', 'DEPARTMENT_ID']

# Create data to be inserted
new_row = spark.createDataFrame([(285, 'Mickey', 'Mouse', 'MICKEY', '505.122.333', '02-FEB-09', 'ST_CLERK', 2000, 10 , 103, 50)], columns)
new_row.show()

# Add new rows to existing df using union
df_insert1 = df1.union(new_row)
df_insert1.filter((df1.FIRST_NAME == 'Mickey') | (df1.FIRST_NAME == 'Donald')).show()

+-----------+----------+---------+------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME| EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+------+------------+---------+--------+------+--------------+----------+-------------+
|        285|    Mickey|    Mouse|MICKEY| 505.122.333|02-FEB-09|ST_CLERK|  2000|            10|       103|           50|
+-----------+----------+---------+------+------------+---------+--------+------+--------------+----------+-------------+

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|        198|    Donald| 

In [0]:
# UPDATE ROWS

# Update rows, when Donald then update Donald2, when Jennifer then update to Jennifer2
df1.show()
df_update1 = df1.withColumn("FIRST_NAME", when(df1.FIRST_NAME == "Donald","Donald2").when(df1.FIRST_NAME == "Jennifer","Jennifer2").otherwise(df1.FIRST_NAME))
df_update1.show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|          null|       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|          null|       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|          null|       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|          null|       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|          null|       201|           20|


In [0]:
# DELETE ROWS

# You can just use filter the df and assign it to a new df since it will not include those rows
df_delete_rows = df1.filter((df1.DEPARTMENT_ID != 60) & (df1.DEPARTMENT_ID != 50) & (df1.DEPARTMENT_ID != 90))
df_delete_rows.show()

+-----------+-----------+----------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID| FIRST_NAME| LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+-----------+----------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        200|   Jennifer|    Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|          null|       101|           10|
|        201|    Michael| Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|          null|       100|           20|
|        202|        Pat|       Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|          null|       201|           20|
|        203|      Susan|    Mavris| SMAVRIS|515.123.7777|07-JUN-02|    HR_REP|  6500|          null|       101|           40|
|        204|    Hermann|      Baer|   HBAER|515.123.8888|07-JUN-02|    PR_REP| 10000|          null|       101

In [0]:
# REPARTITION
    # Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

# Repartition by number of partitions
df_repartition1 = df1.repartition(5)
df_repartition1.rdd.getNumPartitions()

# Repartition by column name/s
df_repartition2 = df1.repartition('EMPLOYEE_ID', 'FIRST_NAME')
df_repartition2.rdd.getNumPartitions()

# Repartition by column name & specify number of partitions
df_repartition3 = df1.repartition(5, 'EMPLOYEE_ID')
df_repartition3.rdd.getNumPartitions()

Out[217]: 5

In [0]:
# PARTITIONBY
    # Partitions the output by the given columns on the file system.

# df1.write.partitionBy('FIRST_NAME').mode("overwrite").format("csv")

In [0]:
# WRITE TO FILES
# OPTIONS WHEN WRITING TO FILES
# WRITE OPTIONS:
    # overwrite – mode is used to overwrite the existing file.
    # append – To add the data to the existing file.
    # ignore – Ignores write operation when the file already exists.
    # error – This is a default option when the file already exists, it returns an error.

df1.write.option('header',True).mode('overwrite').csv('/FileStore/tables/employees_new2.csv')

