## Cleaning Data with PySpark

Working with data is tricky - working with millions or even billions of rows is worse. Did you receive some data processing code written on a laptop with fairly pristine data? Chances are you’ve probably been put in charge of moving a basic data process from prototype to production. You may have worked with real world datasets, with missing fields, bizarre formatting, and orders of magnitude more data. Even if this is all new to you, this notebook helps you learn what’s needed to prepare data processes using Python with Apache Spark. 

### Defining a schema

- Creating a defined schema helps with data quality and import performance. As mentioned during the lesson, we'll create a simple schema to read in the following columns:

Name
Age
City
The `Name` and `City` columns are `StringType()` and the `Age` column is an `IntegerType()`

In [1]:
# Import the pyspark.sql.types library ->Import * from the pyspark.sql.types library.
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field -> Define a StructField for name, age, and city. Each field should correspond to the correct datatype and not be nullable.
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False),
])

In [2]:
# to build a new SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark= SparkSession.builder.getOrCreate()
    
sc = spark.sparkContext
sc

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/20 11:00:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# check the saved table in spark
spark.catalog.listTables()

[]

### Using lazy processing

- Lazy processing operations will usually return in about the same amount of time regardless of the actual quantity of data. Remember that this is due to Spark not performing any transformations until an action is requested.

- For this exercise, we'll be defining a Data Frame (aa_dfw_df) and add a couple transformations. Note the amount of time required for the transformations to complete when defined vs when the data is actually queried. These differences may be short, but they will be noticeable. When working with a full Spark cluster with larger quantities of data the difference will be more apparen

In [4]:
# Load the CSV file
from pyspark.sql.functions import lower
aa_dfw_df = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/AA_DFW_2014_Departures_Short.csv', header=True)

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show(3)


                                                                                

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2014|         0005|                          519|    hnl|
|       01/01/2014|         0007|                          505|    ogg|
|       01/01/2014|         0035|                          174|    slc|
+-----------------+-------------+-----------------------------+-------+
only showing top 3 rows



### Saving a DataFrame in Parquet format

- When working with Spark, you'll often start with CSV, JSON, or other data sources. This provides a lot of flexibility for the types of data to load, but it is not an optimal format for Spark. The Parquet format is a columnar data store, allowing Spark to use predicate pushdown. This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.

- In this exercise, we're going to practice creating a new Parquet file and then process some data from it.

- The spark object and the df1 and df2 DataFrames have been setup for you.

In [5]:
df1 = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/AA_DFW_2014_Departures_Short.csv', header=True)
df2 = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/AA_DFW_2015_Departures_Short.csv', header=True)
print(type(df1))
print(type(df2))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


In [6]:
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())

                                                                                

df1 Count: 157198
df2 Count: 146558


                                                                                

303756


### SQL and Parquet

- Parquet files are perfect as a backing data store for SQL queries in Spark. While it is possible to run the same queries directly via Spark's Python functions, sometimes it's easier to run SQL queries alongside the Python options.

- For this example, we're going to read in the Parquet file we created in the last exercise and register it as a SQL table. Once registered, we'll run a quick query against the table (aka, the Parquet file).

- The spark object and the AA_DFW_ALL.parquet file are available for you automatically

In [8]:
# check the saved table in spark
spark.catalog.listTables()

[]

In [9]:
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')
flights_df.show(3)


+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2015|         0005|                HNL|                          526|
|       01/01/2015|         0007|                OGG|                          517|
|       01/01/2015|         0023|                SFO|                          233|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 3 rows



In [10]:
flights_df= flights_df.withColumnRenamed("Actual elapsed time (Minutes)", "flight_duration")
flights_df.show(3)

+-----------------+-------------+-------------------+---------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|flight_duration|
+-----------------+-------------+-------------------+---------------+
|       01/01/2015|         0005|                HNL|            526|
|       01/01/2015|         0007|                OGG|            517|
|       01/01/2015|         0023|                SFO|            233|
+-----------------+-------------+-------------------+---------------+
only showing top 3 rows



In [11]:
# filter rows where name starts with 'A'
flights_df.filter(flights_df["Destination Airport"].startswith('A')).show(3)

+-----------------+-------------+-------------------+---------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|flight_duration|
+-----------------+-------------+-------------------+---------------+
|       01/01/2015|         0125|                ATL|            103|
|       01/01/2015|         0194|                ATL|              0|
|       01/01/2015|         0232|                ATL|            113|
+-----------------+-------------+-------------------+---------------+
only showing top 3 rows



In [12]:
# Register the temp table
flights_df.createOrReplaceTempView('flights')

# check the saved table in spark
spark.catalog.listTables()

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [13]:
type(flights_df)
flights_df.show(3)

+-----------------+-------------+-------------------+---------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|flight_duration|
+-----------------+-------------+-------------------+---------------+
|       01/01/2015|         0005|                HNL|            526|
|       01/01/2015|         0007|                OGG|            517|
|       01/01/2015|         0023|                SFO|            233|
+-----------------+-------------+-------------------+---------------+
only showing top 3 rows



In [14]:
flights_df.printSchema()

root
 |-- Date (MM/DD/YYYY): string (nullable = true)
 |-- Flight Number: string (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- flight_duration: string (nullable = true)



In [15]:
# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)

The average flight time is: 143


### DataFrame Column Operations


#### Filtering column content with Python
You've looked at using various operations on DataFrame columns - now you can modify a real dataset. The DataFrame `voter_df` contains information regarding the voters on the Dallas City Council from the past few years. This truncated DataFrame contains the date of the vote being cast and the name and position of the voter. Your manager has asked you to clean this data so it can later be integrated into some desired reports. The primary task is to remove any null entries or odd characters and return a specific set of voters where you can validate their information.

This is often one of the first steps in data cleaning - removing anything that is obviously outside the format. For this dataset, make sure to look at the original data and see what looks out of place for the `VOTER_NAME` column.

The `pyspark.sql.functions` library is already imported under the alias F.

In [18]:
voter_df = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/DallasCouncilVoters.csv', header=True)
voter_df.show(3)

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
+----------+-------------+-------------------+
only showing top 3 rows



In [24]:
# Show the distinct VOTER_NAME entries
distinct_name_voter = voter_df.select("VOTER_NAME").distinct().show(40, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|VOTER_NAME                                                                                                                                                                                                                                                                                                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [29]:
# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
voter_df.show(3)

+----+-----+----------+
|DATE|TITLE|VOTER_NAME|
+----+-----+----------+
+----+-----+----------+



In [30]:

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

+----------+
|VOTER_NAME|
+----------+
+----------+



### Modifying DataFrame columns

- Previously, you filtered out any rows that didn't conform to something generally resembling a name. Now based on your earlier work, your manager has asked you to create two new columns - `first_name` and last_name. She asks you to split the `VOTER_NAME` column into words on any space character. You'll treat the last word as the `last_name`, and all other words as the `first_name`. You'll be using some new functions in this exercise including `.split()`, `.size()`, and `.getItem()`. The `.getItem(index)` takes an integer value to return the appropriately numbered item in the column. The functions `.split()` and `.size()` are in the `pyspark.sql.functions` library.

- Please note that these operations are always somewhat specific to the use case. Having your data conform to a format often matters more than the specific details of the format. Rarely is a data cleaning task meant just for one person - matching a defined format allows for easier sharing of the data later (ie, Paul doesn't need to worry about names - Mary already cleaned the dataset).

- The filtered voter DataFrame from your previous exercise is available as voter_df. The `pyspark.sql.functions` library is available under the alias F.

In [65]:
# we have to call again csv file to some manipulations
voter_df = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/DallasCouncilVoters.csv', header=True)
voter_df.show(3)

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
+----------+-------------+-------------------+
only showing top 3 rows



In [66]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('split', F.split(voter_df.VOTER_NAME, '\s+'))
voter_df.show(3)

+----------+-------------+-------------------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|               split|
+----------+-------------+-------------------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|
+----------+-------------+-------------------+--------------------+
only showing top 3 rows



In [67]:
# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.split.getItem(0))
voter_df.show(3)

+----------+-------------+-------------------+--------------------+----------+
|      DATE|        TITLE|         VOTER_NAME|               split|first_name|
+----------+-------------+-------------------+--------------------+----------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael|
+----------+-------------+-------------------+--------------------+----------+
only showing top 3 rows



In [68]:
# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.split.getItem(F.size('split')-1))
voter_df.show(3)

+----------+-------------+-------------------+--------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|               split|first_name|last_name|
+----------+-------------+-------------------+--------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|
+----------+-------------+-------------------+--------------------+----------+---------+
only showing top 3 rows





In [61]:
# Drop the splits column
voter_df = voter_df.drop('split')

# Show the voter_df DataFrame
voter_df.show()

+----------+-------------+-------------------+-------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|         random_val|first_name|last_name|
+----------+-------------+-------------------+-------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates| 0.7301764769841849|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston| 0.7543098348614885|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|               null|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|0.45761288875457473|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas| 0.4761696744443019|     Casey|   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|   0.88596071846336|   Carolyn|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs| 0.1450245707549751|     Scott|   Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|0.47760497438660254|        B.|  McGough|
|02/08/2017|Councilmember|      

## Conditional DataFrame column operations


#### when() example
- The `when()` clause lets you conditionally modify a Data Frame based on its content. You'll want to modify our `voter_df` DataFrame to add a random number to any voting member that is defined as a "Councilmember".

- The `voter_df` DataFrame is defined and available to you. The `pyspark.sql.functions` library is available as `F.` You can use `F.rand()` to generate the random value.

In [41]:
data = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/DallasCouncilVoters.csv', header=True)
voter_df = data.select('*') # to copy the dataframe in pyspark
voter_df.show(3)

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
+----------+-------------+-------------------+
only showing top 3 rows



In [45]:
# Add a column to voter_df for any voter with the title 'Councilmember'
voter_df = voter_df.withColumn('random_val', F.when(voter_df['TITLE'] == 'Councilmember', F.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show(2)

+----------+-------------+------------------+------------------+
|      DATE|        TITLE|        VOTER_NAME|        random_val|
+----------+-------------+------------------+------------------+
|02/08/2017|Councilmember| Jennifer S. Gates|0.7301764769841849|
|02/08/2017|Councilmember|Philip T. Kingston|0.7543098348614885|
+----------+-------------+------------------+------------------+
only showing top 2 rows



### When / Otherwise

- This requirement is similar to the last, but now you want to add multiple values based on the voter's position. Modify your `voter_df` DataFrame to add a random number to any voting member that is defined as a `Councilmember`. Use 2 for the `Mayor` and 0 for anything other position.

- The `voter_df` Data Frame is defined and available to you. The `pyspark.sql.functions` library is available as `F.` You can use `F.rand()` to generate the random value.

In [46]:
voter_df1 = data.select('*') # to copy the dataframe in pyspark
voter_df1.show(3)

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
+----------+-------------+-------------------+
only showing top 3 rows



In [47]:
# Add a column to voter_df named random_val with the results of the F.rand() method for any voter with the title Councilmember.
# Set random_val to 2 for the Mayor. Set any other title to the value 0.
voter_df1 = voter_df1.withColumn('random_val', 
                                 F.when(voter_df1['TITLE'] == 'Councilmember', F.rand())
                                 .when(voter_df1['TITLE'] == 'Mayor', 2).otherwise(0))
voter_df1.show(5)

+----------+-------------+-------------------+------------------+
|      DATE|        TITLE|         VOTER_NAME|        random_val|
+----------+-------------+-------------------+------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|0.5489358770778864|
|02/08/2017|Councilmember| Philip T. Kingston|0.9943753680960055|
|02/08/2017|        Mayor|Michael S. Rawlings|               2.0|
|02/08/2017|Councilmember|       Adam Medrano|0.4595390703545188|
|02/08/2017|Councilmember|       Casey Thomas|0.7497170038838954|
+----------+-------------+-------------------+------------------+
only showing top 5 rows



In [49]:
# Use the .filter clause to find 0 in random_val.
# Use the .filter() clause with random_val
voter_df1.filter(voter_df1.random_val== 0).show(3)

+----------+--------------------+-----------------+----------+
|      DATE|               TITLE|       VOTER_NAME|random_val|
+----------+--------------------+-----------------+----------+
|04/25/2018|Deputy Mayor Pro Tem|     Adam Medrano|       0.0|
|04/25/2018|       Mayor Pro Tem|Dwaine R. Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|       0.0|
+----------+--------------------+-----------------+----------+
only showing top 3 rows



### User defined functions

#### Using user defined functions in Spark
- You've seen some of the power behind Spark's built-in string functions when it comes to manipulating DataFrames. However, once you reach a certain point, it becomes difficult to process the data in a without creating a rat's nest of function calls. Here's one place where you can use User Defined Functions to manipulate our DataFrames.

- For this exercise, we'll use our `voter_df` DataFrame, but you're going to replace the `first_name` column with the first and middle names.

- The `pyspark.sql`.functions library is available under the alias F. The classes from pyspark.sql.types are already imported.

In [69]:
voter_df.show(3)

+----------+-------------+-------------------+--------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|               split|first_name|last_name|
+----------+-------------+-------------------+--------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|
+----------+-------------+-------------------+--------------------+----------+---------+
only showing top 3 rows



In [53]:
# Edit the getFirstAndMiddle() function to return a space separated string of names, except the last entry in the names list.
def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[0:-1])

In [81]:

# Define the method as a UDF
from pyspark.sql.types import StringType
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.split))

voter_df.printSchema()


root
 |-- DATE: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- VOTER_NAME: string (nullable = true)
 |-- split: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_and_middle_name: string (nullable = true)



### Adding an ID Field

- When working with data, you sometimes only want to access certain fields and perform various operations. In this case, find all the unique voter names from the DataFrame and add a unique ID number. Remember that Spark IDs are assigned based on the DataFrame partition - as such the ID values may be much greater than the actual number of rows in the DataFrame.

- With Spark's lazy processing, the IDs are not actually generated until an action is performed and can be somewhat random depending on the size of the dataset.

- The spark session and a Spark DataFrame df containing the DallasCouncilVotes.csv.gz file are available in your workspace. The pyspark.sql.functions library is available under the alias F.

In [91]:
df = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/2DallasCouncilVotes.csv', header=True)
df.show(3)

+----------+------------------+---------+--------+-------------+-------------------+---------+------------------+-----------------------+------------------+--------------------+
|      DATE|AGENDA_ITEM_NUMBER|ITEM_TYPE|DISTRICT|        TITLE|         VOTER NAME|VOTE CAST|FINAL ACTION TAKEN|AGENDA ITEM DESCRIPTION|         AGENDA_ID|             VOTE_ID|
+----------+------------------+---------+--------+-------------+-------------------+---------+------------------+-----------------------+------------------+--------------------+
|02/08/2017|                 1|   AGENDA|      13|Councilmember|  Jennifer S. Gates|      N/A|  NO ACTION NEEDED|          Call to Order|020817__Special__1|020817__Special__...|
|02/08/2017|                 1|   AGENDA|      14|Councilmember| Philip T. Kingston|      N/A|  NO ACTION NEEDED|          Call to Order|020817__Special__1|020817__Special__...|
|02/08/2017|                 1|   AGENDA|      15|        Mayor|Michael S. Rawlings|      N/A|  NO ACTION NEED

In [93]:
# Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct()
voter_df.show(3)

+--------------+
|    VOTER NAME|
+--------------+
|Tennell Atkins|
|  Scott Griggs|
| Scott  Griggs|
+--------------+
only showing top 3 rows



In [94]:

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())


There are 36 rows in the voter_df DataFrame.



In [95]:
# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df.show(3)

+--------------+------+
|    VOTER NAME|ROW_ID|
+--------------+------+
|Tennell Atkins|     0|
|  Scott Griggs|     1|
| Scott  Griggs|     2|
+--------------+------+
only showing top 3 rows



In [97]:
# Show the rows with 10 min IDs in the set
voter_df.orderBy(voter_df.ROW_ID.asc()).show(10)

+-------------------+------+
|         VOTER NAME|ROW_ID|
+-------------------+------+
|     Tennell Atkins|     0|
|       Scott Griggs|     1|
|      Scott  Griggs|     2|
|      Sandy Greyson|     3|
|Michael S. Rawlings|     4|
|       Kevin Felder|     5|
|       Adam Medrano|     6|
|               null|     7|
|   Casey Thomas, II|     8|
|      Mark  Clayton|     9|
+-------------------+------+
only showing top 10 rows



### IDs with different partitions

- You've just completed adding an ID field to a DataFrame. Now, take a look at what happens when you do the same thing on DataFrames containing a different number of partitions.

- To check the number of partitions, use the method .rdd.getNumPartitions() on a DataFrame.

- The spark session and two DataFrames, voter_df and voter_df_single, are available in your workspace. The instructions will help you discover the difference between the DataFrames. The pyspark.sql.functions library is available under the alias F.

In [98]:
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())


There are 1 partitions in the voter_df DataFrame.



In [99]:
# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

In [101]:
# Show the top 10 IDs in each DataFrame 
print(voter_df.orderBy(voter_df.ROW_ID.desc()).show(10))


+--------------------+------+
|          VOTER NAME|ROW_ID|
+--------------------+------+
|  the  final  201...|    35|
|  the  final   20...|    34|
|   the   final  2...|    33|
|  the  final  201...|    32|
|   the   final  2...|    31|
| the final 2018 A...|    30|
|  the  final   20...|    29|
|          011018__42|    28|
|        Lee Kleinman|    27|
|         Erik Wilson|    26|
+--------------------+------+
only showing top 10 rows

None


## Caching a DataFrame

- You've been assigned a task that requires running several analysis operations on a DataFrame. You've learned that caching can improve performance when reusing DataFrames and would like to implement it.

- You'll be working with a new dataset consisting of airline departure information. It may have repetitive data and will need to be de-duplicated.

- The DataFrame departures_df is defined, but no actions have been performed.



In [106]:
departures_df = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/AA_DFW_2016_Departures_Short.csv', header=True, inferSchema=True)
departures_df.show(3)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2016|            5|                HNL|                          529|
|       01/01/2016|            7|                OGG|                          512|
|       01/01/2016|           25|                PHL|                          161|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 3 rows



In [107]:
import time 
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))

                                                                                

Counting 140604 rows took 3.456379 seconds
Counting 140604 rows again took 0.868809 seconds


                                                                                

### Removing a DataFrame from cache

- You've finished the analysis tasks with the departures_df DataFrame, but have some other processing to do. You'd like to remove the DataFrame from the cache to prevent any excess memory usage on your cluster.

- The DataFrame departures_df is defined and has already been cached for you.

In [108]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

Is departures_df cached?: True
Removing departures_df from cache
Is departures_df cached?: False


### File import performance

- You've been given a large set of data to import into a Spark DataFrame. You'd like to test the difference in import speed by splitting up the file.

- You have two types of files available: `departures_full.txt.gz` and `departures_xxx.txt.gz` where xxx is 000 - 013. The same number of rows is split between each file.

In [115]:
# Import the full and split files into DataFrames
full_df = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/AA_DFW_2016_Departures_Short.csv', header=True, inferSchema=True)
split_df = spark.read.csv('/Users/ssamilozkan/Desktop/pyspark/cleaning_data_with_pyspark/dataset/AA_DFW_2016_*.csv', header=True, inferSchema=True)

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))

Total rows in full DataFrame:	140604
Time to run: 0.175895
Total rows in split DataFrame:	140604
Time to run: 0.127455
