In [None]:
# DATACAMP COURSE "CLEANING DATA WITH PYSPARK"
# UNIT 1

# Defining a Schema
# -- useful to filter out garbage records from a dataset when reading in. 
# -- defines the datatypes of given columns, their datatype, and whether or not values can be NULL

# define imports
from pyspark.sql.types import *

# Define a new schema units the StructType method (note the TYPE)
people_schema= StructType([
    #Define a StructField for each field (note the FIELD)
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('city', StringType(), True)
])

In [None]:
# Immutability andlazy processing
# Spark is designed to use immutable objects
# Spark is predominently functional prograaming
# Spark dataframes are defined once and not modifiable
# If updates are made, a new copy is made, allows spark to avoid issues with concurrent objects

# read in a hypothetical dataset:
voter_df = spark.read.csv('voterdata.csv')

#making changes. THis generate a new copy with the transformatoin, and assigns it so the variable 'voter_df', eliminating the original dataframe
voter_df = voter_df.withColumn('fullyear', voter_df.year + 2000)
voter_df = voter_df.drop(voter_df.year)

In [None]:
# lazy processing
# spark is efficient because of lazy processing (little occurs until an action is performed)
# In the case above, the datasets were not actually read, added or modified. We only updated the instructions until an action is called. 
# This then illustrates the point that spark has to operations, transformation (lazy) and actions (eager)

voter_df.count() # here, count is considered by spark as an action, and then the action plan of all transformations prior would be executed to obtain this result.

In [None]:
# more lazy processing
sample_df=spark.read.format('csv').option(Header=True).load('dummy_csv.csv') # lazy operation
sample_df=sample_df.withColumn('new_col_name', F.lower(sample_df['original_col_name'])) # lazy operation
sample_df=sample_df.drop(sample_df['original_col_name']) # lazy operation
sample_df.show() # active operation

In [None]:
# Understanding parquet

# difficulties with CSVs - no defined schema, nested data requires special handling, limited encoding formats
# spark has problems processing CSV data (slow to parse, can't be shared between workers at import, all data must be read before a schema can be infered if none is defined)
# spark has 'predicate pushdown' (idea is to order tasks to do the least amount of work, something like filtering data prior to processing is one way to optimize and use 'predicate pushdown')
# CSVs cannot be filtered via predicate pushdown
# Spark processes are multi step and may use an intermediate file representations.

# parquet files - compressed columnar format
# structured in chunks with read/write operations without processing the entire file.
# this format supports predicte pushdown for performance improvement
# parquet files contain schema information

# working with parquet
# reading parquet:
df = spark.read.format('parquet').load('filename.parquet') # ong bersion has acess to more flags (i.e. ok to overwite existing file, etc)
df = spark.read.parquet('filename.parquet') # same but shortcut

# writing
df.write.format('parquet').save('filename.parquet') # long version has access to more flags (i.e. ok to overwrite file, etc)
df.write.parquet('filename.parquet') # same but shortcut


In [None]:
# parquet and SQL
# parquet also are perfect for SQL Operations
flight_df=spark.read.parquet('filename.parquet')
flight_df.createOrReplaceTempView('flights') # assign a name to the temporary table view
short_flights_df=spark.sql("SELECT * FROM flights WHERE flightduration < 100")

In [None]:
# practice with SQL in spark
flights_df=spark.read.parquet('dummy_df.parquet')
flights_df.createOrReplace('flights')
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0] # note the use of 'collect' here
print("The average flight time is : %d" % avg_duration)

In [None]:
# UNIT 2 : DATAFRAME COLUMN OPERATIONS
# Notes:
# DATAFRAMES ARE IMMUTABLE, made up of row and columns. use transformation operations to modify data
df.filter(df.name.lie('M%'))
df.select('name, position')

# commin transforms:
# filter (like where in sql)
df.filter(df.column > 100) # or df.where(...)

# select - returns requsted columns
df.select('col_name')

# withColumn # creates new column in dataframe
df.withColumn('year', df.date.year) # first arg is the new col name, second is the command to create the column

# drop
df.drop('unused_col')

In [None]:
# FILTERING DATA
# remove nulls, remove odd entries, split data from combined sources, negate with ~
df.filter(df['name'].isNotNull())
df.filter(df.date.year > 1800)
df.where(df['_c0'].contains('VOTE'))
df.where(~ df._c1.isNull())

In [None]:
# COLUMN STRING TRANSFORMATIONS
# many of these functions are contained in the pyspark.sql.functions library

import pyspark.sql.functions as F

df.withColumn('col_name', F.upper('name'))

# we can also create intermediary columns that are useful for processig 
df.withColumn('splits', F.split('name', '')) # returns list of words in a column called splits

# casting string to itneger
df.withColumn('year', df['_c4'].cast(IntegerType()))

# possibility exists to wory with ArrayType() columns (analogous to python lists)
.size('column') # returns leng of arrayType() column
.getItem('index') # used to retrieve a specific item at index of list column (takes index, and returns value at that index)

In [None]:
voter_df.withColumn('splits', voter_df.split(voter_df['voter_name'], '\s+'))

voter_df.withColumn('first_name', voter_df.splits.getItem(0)) # returns the first element of each row's list.
voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

In [None]:
# conditional clauses
# optimized built in conditionals. 

# .when() and .otherwise()

from pyspark.sql.functions import when

# .when(<if condition>, <then x>)
df.select(df.Name, df.Age, F.when(df.Age >= 18, "Adult")) # note that the select function can create arguments dyanimcally. here it would produce a column of 'adult' or blank with a blank column name


# chaining multiple whens together
df.select(df.Name, df.Age,.when(df.Age >= 18, "Adult").when(df.Age < 18, "Minor")) # this is the equivalent of an multiple if statements

# contrasting is the use of the .otherwise() clause (essentially an 'else')
df.select(df.Name, df.Age, .when(df.Age >= 18, "Adult").otherwise("Minor"))





In [None]:
# user defined functions (UDF)

# once the udf is written, it is called using the pyspark.sql.functions.udf method
from pyspark.sql.functions import udf

# reverse string udf:
# first define a python function 
def reverseString(mystr):
    return mystr[::-1]

# store function as variable for later use
# args = the name of the method you defined, and the spark datatype that you will return from the method.
udfReverseString=udf(reverseString, StringType())

# then touse with spark, add a column to the df, passing the function variable name as the second argument
user_df=user_df.withColumn('ReverseName', udfReverseString(user_df.Name))


# UDFs thatdo not require arguments
def sortingCap():
    return random.choice(['G', 'H', 'R', 'S'])

udfSortingCap = udf(sortingCap, StringType())
user_df=user_df.withColumn("Class", udfSortingCap())

In [None]:
# partitioning and lazy processing:

 # dataframes are broken up into partitions
# partition size can vary and be optimizied
# for now consider each paition runs independelty

# transformations are LAZY (more like recipe than a command) .withColumn .select
# nothing is actually done until an action is performed .count(), .write()

# transformations can be re-ordered for the best performance


# addind IDs (serializing number) are not very parallel so use 'monotonically_increasing_id()' incaressd value and isunique. 
# these ids are not necesarily sequential and can gave gaps. 

# sparks is parallel, each parition to allocated up to 8 biliion IDs to be assigned. the id are 64 bit number split into numbers. 
# into groups based on the spark partition. each group contains 8.4 billino ids and there are 2.1 millino groups none of which overlap. 



In [6]:
from utils.utils import create_spark_postgres

In [7]:
df=create_spark_postgres(server_name='gis_analysis', table_name='farmers_markets')

In [9]:
df.show()

                                                                                

+-------+--------------------+--------------------+-----------+--------------------+--------------------+-----+-----------+----------+-------+--------------------+
|   fmid|         market_name|              street|       city|              county|                  st|  zip|  longitude|  latitude|organic|          geog_point|
+-------+--------------------+--------------------+-----------+--------------------+--------------------+-----+-----------+----------+-------+--------------------+
|2000002|             Dig It!|                NULL|       NULL|                NULL|        Pennsylvania| NULL|       NULL|      NULL|      -|                NULL|
|2000004|     Farm a la Carte|                NULL|       NULL|                NULL|             Georgia| NULL|       NULL|      NULL|      -|                NULL|
|2000009|Freshest Cargo: M...|                NULL|       NULL|                NULL|          California| NULL|       NULL|      NULL|      -|                NULL|
|2000013|Green M