### Data Cleaning with PySpark

#### Import Libraries

In [1]:
import pandas as pd
import time

from pyspark.sql.types import *

from pyspark.sql import SparkSession

#Import SQL functions
import pyspark.sql.functions as F 

#col package for column
from pyspark.sql.functions import col

#Import UDF function from sql library
from pyspark.sql.functions import udf

#Random ID generation partitioning wise
from pyspark.sql.functions import monotonically_increasing_id


#Broadcast function Implementation

from pyspark.sql.functions import broadcast

### Creating a Spark Session

In [2]:
spark = SparkSession.builder.getOrCreate()

### Define a Schema

In [3]:
people_schema = StructType([StructField('name', StringType(),False)
                           ,StructField('age', IntegerType(),False)
                            ,StructField('city',StringType(),False)
                           
                           ])

### Load the DataFrame for Lazy Processing



Lazy Processing in Spark is the idea that very little actually happens until an action is performed.

In [4]:
aafw_data = spark.read.format('csv').options(Header=True).load('../PySpark/AA_DFW_2017_Departures_Short.csv.gz')
aafw_data = aafw_data.withColumn('airport',F.lower(aafw_data['Destination Airport']))
aafw_data = aafw_data.drop(aafw_data['Destination Airport'])

aafw_data.show(3)

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2017|         0005|                          537|    hnl|
|       01/01/2017|         0007|                          498|    ogg|
|       01/01/2017|         0037|                          241|    sfo|
+-----------------+-------------+-----------------------------+-------+
only showing top 3 rows



### Saving a data format in Parquet format



The <b>Parquet</b> format is a columnar data store, allowing Spark to use predicate pushdown. This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.





In [5]:
df1 = spark.read.csv('../PySpark/AA_DFW_2017_Departures_Short.csv',header=True)
df2 = spark.read.csv('../PySpark/AA_DFW_2016_Departures_Short.csv',header=True)

df3 = df1.union(df2)


df3 = df3.toPandas()

df3.to_parquet('AA_DFW_ALL.parquet')

print(spark.read.parquet('AA_DFW_ALL.parquet').count())


279962


<b>Please Note:</b>
    
    
 There was a technical issue in converting directly to a parquet file which could have been done using the following command:
        
<b>df3.write.parquet('AA_DFW_ALL.parquet',mode='overwrite')</b>
        
    
    As an alternative the dataframe was converted to a pandas dataframe and then to a parquet file.




### SQL and Parquet Operation:

In [6]:
flights_df = spark.read.option('header','false').parquet('AA_DFW_ALL.parquet') #reading a parquet file

flights_df.createOrReplaceTempView('flights')

avg_duration = spark.sql('select count("Flight Number") from flights').collect()[0]
avg_duration

Row(count(Flight Number)=279962)

### Filtering Column with PySpark


Filtering names which don't resemble to a name



In [7]:
voters_df = spark.read.csv('../PySpark/DallasCouncilVoters.csv',header=True)

#Select voters with distinct voter name
voters_df.select(voters_df['VOTER_NAME']).distinct()

#Length of voter from 1-20
voter_df_filter = voters_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME)<20')

#Votername should not contain _ 
voters_df = voter_df_filter.filter(~F.column('VOTER_NAME').contains('_'))


#Voter name 
voter_df_filter.select('VOTER_NAME').distinct().show(20,truncate=False)



+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|011018__42         |
|Mark  Clayton      |
|Casey Thomas       |
|Sandy  Greyson     |
|Mark Clayton       |
|Jennifer S.  Gates |
|Tiffinni A. Young  |
|B. Adam  McGough   |
|Omar Narvaez       |
|Philip T. Kingston |
|Rickey D. Callahan |
|Dwaine R. Caraway  |
+-------------------+
only showing top 20 rows



### Modifying DataFrames:
    
    
    
    Adding columns by splitting the voter_name to first name and last name.
    
The <b>.getItem(index)</b> takes an integer value to return the appropriately numbered item in the column. 

In [8]:

voters_df = voters_df.withColumn('splits', F.split(voters_df.VOTER_NAME, '\s+'))

print('\n\nSplits column will look like: \n')
voters_df.show(1)

#Split the column into first and last name using getitem(index) function

voters_df = voters_df.withColumn('first_name', voters_df.splits.getItem(0))
voters_df = voters_df.withColumn('last_name', voters_df.splits.getItem(F.size('splits') - 1))

#Dropping the splits column

print('\n \n After Splitting the column into first name and last name ')
#voters_df = voters_df.drop('splits')
voters_df.show(1)



Splits column will look like: 

+----------+-------------+-----------------+--------------------+
|      DATE|        TITLE|       VOTER_NAME|              splits|
+----------+-------------+-----------------+--------------------+
|02/08/2017|Councilmember|Jennifer S. Gates|[Jennifer, S., Ga...|
+----------+-------------+-----------------+--------------------+
only showing top 1 row


 
 After Splitting the column into first name and last name 
+----------+-------------+-----------------+--------------------+----------+---------+
|      DATE|        TITLE|       VOTER_NAME|              splits|first_name|last_name|
+----------+-------------+-----------------+--------------------+----------+---------+
|02/08/2017|Councilmember|Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|
+----------+-------------+-----------------+--------------------+----------+---------+
only showing top 1 row



### Conditional DataFrame in PySpark:
    
    
    when and otherwise column are the substitute of if-else statement.
    
    The when() clause lets you conditionally modify a Data Frame based on its content. 

In [9]:
voters_df = voters_df.withColumn('random_value',F.when(voters_df.TITLE=='Councilmember',F.rand()))
voters_df.show(1)

+----------+-------------+-----------------+--------------------+----------+---------+------------------+
|      DATE|        TITLE|       VOTER_NAME|              splits|first_name|last_name|      random_value|
+----------+-------------+-----------------+--------------------+----------+---------+------------------+
|02/08/2017|Councilmember|Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|0.4816924136182461|
+----------+-------------+-----------------+--------------------+----------+---------+------------------+
only showing top 1 row



<b>Multiple When and otherwise usage</b>

In [10]:

voters_df = voters_df.withColumn('random_val',F.when(voters_df.TITLE=='Councilmember',F.rand())
                                .when(voters_df.TITLE=='Mayor',2)
                                .otherwise(0))




voters_df.show(5)

+----------+-------------+-------------------+--------------------+----------+---------+-------------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|              splits|first_name|last_name|       random_value|         random_val|
+----------+-------------+-------------------+--------------------+----------+---------+-------------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates| 0.4816924136182461| 0.4241477076323842|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|0.20291972673953607| 0.9266698763601335|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|               null|                2.0|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|0.08655542731376109|0.05239023055019254|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|   Thomas|0.0378337

### User Defined Functions (UDF)


a. The return type from a UDF can be any defined type, even a full StructType() schema object.

b. The second argument for UDF can be ArrayType,IntegerType,LongType,StringType etc

c. The Second argument for UDF <b>cannot</b> be UDF.

                        



In [11]:
voters_df.drop('random_val')


#Define a function 

def getfirstmiddle(names):
    
    return ' '.join(names)



#udf wrapping for the function with Stringtype 
udffirstmiddle = udf(getfirstmiddle,StringType()) 
    
voters_df = voters_df.withColumn('getfirstmiddle',udffirstmiddle('splits'))

voters_df.show(5)
    

+----------+-------------+-------------------+--------------------+----------+---------+-------------------+-------------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|              splits|first_name|last_name|       random_value|         random_val|     getfirstmiddle|
+----------+-------------+-------------------+--------------------+----------+---------+-------------------+-------------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates| 0.4816924136182461| 0.4241477076323842|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|0.20291972673953607| 0.9266698763601335| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|               null|                2.0|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|0.0865554273137610

### Partitioning and Lazy Processing


<b>Monotonically increasing IDs :</b> pyspark.sql.functions.monotonically_increasing_id()
    
    -Integer (64-bit), increases in value, unique
    -Not necessarily sequential (gaps exist)
    -Completely parallel

In [12]:
#Adding ID field using monotonically increasing ID's

print("\nThere are %d partitions in the voter_df DataFrame.\n" % voters_df.rdd.getNumPartitions())
voters_df = voters_df.withColumn('ID',monotonically_increasing_id())
voters_df.show(1)


There are 1 partitions in the voter_df DataFrame.

+----------+-------------+-----------------+--------------------+----------+---------+------------------+------------------+-----------------+---+
|      DATE|        TITLE|       VOTER_NAME|              splits|first_name|last_name|      random_value|        random_val|   getfirstmiddle| ID|
+----------+-------------+-----------------+--------------------+----------+---------+------------------+------------------+-----------------+---+
|02/08/2017|Councilmember|Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|0.4816924136182461|0.4241477076323842|Jennifer S. Gates|  0|
+----------+-------------+-----------------+--------------------+----------+---------+------------------+------------------+-----------------+---+
only showing top 1 row



### ID Without Overlapping in Parallel 



For avoiding overlapping in ID id we can use previous id + F.monotonically_increasing_id()

Step 1: Take the previous id using the max() of ID

Step 2: Increment the next dataframe id using F.monotincally_increasing_id() + previous_id 


### Caching in PySpark:


Caching in Spark refers to storing the result of a DataFrame in memory or on disk of the processing nodes in a cluster. Caching
improves the speed for subsequent transformations or actions as the data likely no longer needs to be retrieved from the original data source. It reduces the acccess storage,networking and cpu of the Spark as the data is likely already present.

Caching is useful if you only plan to use the Dataframe again. 

In [13]:
departure_df = spark.read.csv('../PySpark/AA_DFW_2014_Departures_Short.csv',header=True)

In [14]:
start_time = time.time()

departure_cache = departure_df.distinct().cache()
print(' First call to Count of departure_df',departure_df.count() ,'in ',time.time()-start_time )

start_time= time.time()

print(' Second call to Count of departure_df',departure_df.count() ,'in ',time.time()-start_time )



 First call to Count of departure_df 157198 in  0.21143460273742676
 Second call to Count of departure_df 157198 in  0.1326453685760498


<b>Observation:</b> Though there is a fraction less difference still caching helps  in a cluster mode

### Clearing a Cache:
    
    
    .unpersist is used for clearing the cache

In [15]:
print('\n Is the dataframe cached ?-',departure_cache.is_cached)

print('\n Clearing the Cache ........')
departure_cache = departure_cache.unpersist()

print('\n Is the dataframe cached ?-',departure_cache.is_cached)




 Is the dataframe cached ?- True

 Clearing the Cache ........

 Is the dataframe cached ?- False


### Spark Configuration


In [16]:
app_name = spark.conf.get('spark.app.name')
driver_tcp_port = spark.conf.get('spark.driver.port')
partitions = spark.conf.get('spark.sql.shuffle.partitions')


print('\n App name : ', app_name)
print('\n Driver TCP Port:',driver_tcp_port )
print('\n Partitions :', partitions)



 App name :  pyspark-shell

 Driver TCP Port: 57597

 Partitions : 200


### Configuring Partitions for a Dataframe

In [17]:
before = departure_df.rdd.getNumPartitions()
print('\n Number of Partitions is',before)

spark.conf.set('spark.sql.shuffle.partitions', 1)

after = spark.read.csv('../PySpark/AA_DFW_2014_Departures_Short.csv',header=True)

print('\n\n After setting partition manually',departure_df.rdd.getNumPartitions())



 Number of Partitions is 2


 After setting partition manually 2


### Explain plan 

In [18]:
airports_df = spark.read.csv('../PySpark/airports.csv',header=True)
flights_df = spark.read.csv('../PySpark/flights_small.csv',header=True)

In [19]:
join_df = airports_df.join(flights_df,airports_df['faa']==flights_df['dest'])

print('Explain is as below:\n\n')
join_df.explain()

Explain is as below:


== Physical Plan ==
*(2) BroadcastHashJoin [faa#460], [dest#495], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
:  +- *(1) Project [faa#460, name#461, lat#462, lon#463, alt#464, tz#465, dst#466]
:     +- *(1) Filter isnotnull(faa#460)
:        +- *(1) FileScan csv [faa#460,name#461,lat#462,lon#463,alt#464,tz#465,dst#466] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/subhr/PySpark/airports.csv], PartitionFilters: [], PushedFilters: [IsNotNull(faa)], ReadSchema: struct<faa:string,name:string,lat:string,lon:string,alt:string,tz:string,dst:string>
+- *(2) Project [year#484, month#485, day#486, dep_time#487, dep_delay#488, arr_time#489, arr_delay#490, carrier#491, tailnum#492, flight#493, origin#494, dest#495, air_time#496, distance#497, hour#498, minute#499]
   +- *(2) Filter isnotnull(dest#495)
      +- *(2) FileScan csv [year#484,month#485,day#486,dep_time#487,dep_delay#488,arr_time#489,arr

### Broadcast Join with Explain Plan

In [20]:
normal_df2 = airports_df.join(broadcast(flights_df),airports_df['faa']==flights_df['dest'])

print('Explain Plan for Broadcast is as below:\n\n')
normal_df2.explain()


Explain Plan for Broadcast is as below:


== Physical Plan ==
*(2) BroadcastHashJoin [faa#460], [dest#495], Inner, BuildRight
:- *(2) Project [faa#460, name#461, lat#462, lon#463, alt#464, tz#465, dst#466]
:  +- *(2) Filter isnotnull(faa#460)
:     +- *(2) FileScan csv [faa#460,name#461,lat#462,lon#463,alt#464,tz#465,dst#466] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/subhr/PySpark/airports.csv], PartitionFilters: [], PushedFilters: [IsNotNull(faa)], ReadSchema: struct<faa:string,name:string,lat:string,lon:string,alt:string,tz:string,dst:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[11, string, true]))
   +- *(1) Project [year#484, month#485, day#486, dep_time#487, dep_delay#488, arr_time#489, arr_delay#490, carrier#491, tailnum#492, flight#493, origin#494, dest#495, air_time#496, distance#497, hour#498, minute#499]
      +- *(1) Filter isnotnull(dest#495)
         +- *(1) FileScan csv [year#484,month#485,day#486,dep_time#487,dep_delay

### Performance Time

In [21]:
start_time = time.time()

join_count = join_df.count()
normal_duration = time.time() - start_time

start_time2 = time.time()

broadcast_2 = normal_df2.count()
broadcast_duration = time.time()-start_time2

print('Normal join takes ',normal_duration)
print('Broadcast takes ',broadcast_duration)





Normal join takes  0.2403581142425537
Broadcast takes  0.13164687156677246


In [22]:
departures_df2 = spark.read.csv('../PySpark/AA_DFW_2015_Departures_Short.csv',header=True)

#Filter Actual elapsed time not to include 0 duration
departures_df2 = departures_df2.filter(departures_df2[3]==0)

#Adding a column
departures_df2 = departures_df2.withColumn('id',F.monotonically_increasing_id())

#Exporting the data to a JSON file

#departures_df2.write.json('departure.json',mode='overwrite')

### Column with .STARTSWITH function 

In [23]:
from pyspark.sql.functions import col
departures_df2.where(col('Destination Airport').startswith('A')).show(5)

+-----------------+-------------+-------------------+-----------------------------+---+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)| id|
+-----------------+-------------+-------------------+-----------------------------+---+
|       01/01/2015|         0194|                ATL|                            0|  5|
|       01/01/2015|         1023|                AUS|                            0| 13|
|       01/01/2015|         1484|                AUS|                            0| 27|
|       01/01/2015|         1662|                ABQ|                            0| 33|
|       01/02/2015|         1457|                AUS|                            0| 62|
+-----------------+-------------+-------------------+-----------------------------+---+
only showing top 5 rows

