# Create the Spark session
This is required to create the first dataframe and start working with Spark.

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/spark_session.html

In [None]:
# First of all, we need to install the package
!pip install pyspark

In [None]:
import os
import sys

import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

#Path to the resources folder of the project, use the full path on Windows
HADOOP_HOME = r"./hadoop_home"
#Use the full path on Windows for PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON, for instance: r"C:\SOFT\Python3\python.exe"
PYSPARK_PYTHON = r"python.exe"
PYSPARK_DRIVER_PYTHON = r"python.exe"
#Need to provide the path to the PostgreSQL driver to create a connection later on
POSTGRESQL_DRIVER_PATH = r"<DIRECTORY>/postgresql-42.3.6.jar"

if(__name__== "__main__"):
    os.environ["HADOOP_HOME"] = HADOOP_HOME
    sys.path.append(HADOOP_HOME + "\\bin")
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_DRIVER_PYTHON

    # Create the configuration in the local machine and give a name to the application (which will appear in the GUI)
    conf = SparkConf() \
        .set("spark.master", "local") \
        .set("spark.app.name", "Spark Dataframes Tutorial") \
        .set("spark.jars", POSTGRESQL_DRIVER_PATH)

    # Create the session 
    spark = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()
    print(f"Python version = {spark.sparkContext.pythonVer}")
    print(f"Spark version = {spark.version}")
    print(spark.sparkContext.getConf().getAll())

In [None]:
# Retrieve the level of parallelism configured (equal to the number of cores is obtained with "*")
print("Master: ",spark.conf.get("spark.master"))
print("Parallelism: ",spark.sparkContext.defaultParallelism)
print("Minimum number of partitions: ",spark.sparkContext.defaultMinPartitions)

In [None]:
# Stop the session and create a new one as above, to change the degree of parallelism
spark.stop()

# Create a Dataframe

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html

In [None]:
# Create an unnamed data structure
data = (('a', 1, 1.1,True),
        ('b', 2, 2.2, False),
        ('c', 3, None, True),
        ('d', 4, 4.4, None),
        ('e', 5, 5.5, True)
       )
# Create a dataframe from the data
spark.createDataFrame(data).show()

In [None]:
# Create an unnamed data structure
data = (('a', 1, 1.1,True),
        ('b', 2, 2.2, False),
        ('c', 3, None, True),
        ('d', 4, 4.4, None),
        ('e', 5, 5.5, True)
       )
# Create a dataframe from the data, but given attribute names
spark.createDataFrame(data, schema=("A","B","C","D")).show()

In [None]:
# Create a data structure
data = ({"A": 'a', "B": 1, "C": 1.1, "D": True},
        {"A": 'b', "B": 2, "C": 2.2, "D": False},
        {"A": 'c', "B": 3, "D": True},
        {"A": 'd', "B": 4, "C": 4.4},
        {"A": 'e', "B": 5, "C": 5.5, "D": True}
       )
# Create a dataframe from the data
df = spark.createDataFrame(data)

In [None]:
%%time
# Show the data
df.show()

In [None]:
%%time
# Describe the Dataframe
df.summary().show() # Also with "df.describe().show()"

In [None]:
# Keep the dataframe in memory once built
df=df.cache()
print("The dataframe is kept in memory now!!!")

## A Spark Dataframe is differenf from Pandas Dataframe

In [None]:
# Transform the Spark Dataframe into a Pandas Dataframe
df.toPandas()

In [None]:
# Obtain the columns
print("Columns: ",df.columns)
# Print the schema
print("Schema:")
df.printSchema()

In [None]:
# Refering to the column, we get a Column object
# We can do it by name or by position
df["A"] # Equivalent to "df[0]"

In [None]:
# Rows and columns cannot be directly referred like in Pandas
print("Column: \n",df.toPandas()["A"])
print("Row: \n",df.toPandas().iloc[0])

In [None]:
# Print the number of partitions (it should coincide with the parallelism configured)
print("Number of partitions: ",df.rdd.getNumPartitions())
# Partitions
print("Partitions: ",df.rdd.glom().collect())

# Actions

## count

In [None]:
# Counts how many rows in the dataframe
df.count()

## first

In [None]:
# Retrieves the first row
# A Row is simply a tuple
df.first()

## collect

In [None]:
%%time
# Retrieves all the rows in the dataframe
# Simply a list of tuples
df.collect()

## take

In [None]:
# Retrieves n rows
print("Take",df.take(3))
print("Head",df.head(3))
print("Tail",df.tail(3))

## write

In [None]:
# Create a CSV file with header
df.write.csv(path="output/CSVfile", mode="overwrite", header=True)
print("CSV written!!!")
# Create a JSON file
df.write.json(path="output/JSONfile", mode="overwrite")
print("JSON written!!!")

### Writting/Reading a partitioned the Dataframe

In [None]:
# Repartition the dataframe
dfRepartitioned=df.repartition(5)
# Print the number of partitions
print("Number of partitions: ",dfRepartitioned.rdd.getNumPartitions())
# Partitions
print("Partitions: ",dfRepartitioned.rdd.glom().collect())
# Create a CSV file
dfRepartitioned.write.csv(path="output/CSVfilePartitioned", header='True', mode="overwrite")
print("CSV written!!!")
# Create a JSON file
dfRepartitioned.write.json(path="output/JSONfilePartitioned", mode="overwrite")
print("JSON written!!!")

In [None]:
dfRecovered = spark.read.csv(path="output/CSVfilePartitioned", header='True', inferSchema='True')
# Print the number of partitions (it depends on the number of files in the directory and the degree of parallelism of the session)
print("Number of partitions: ",dfRecovered.rdd.getNumPartitions())
# Partitions
print("Partitions: ",dfRecovered.rdd.glom().collect())
# Show the data
dfRecovered.show()

# Load data into a Dataframe

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.html

## From files

### Automatically infer the schema

In [None]:
# Set File Path
airportsFilePath = "flight-data/airport-codes-na.txt" # From http://openflights.org/data.html

# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t').cache()
print("File loaded!!!")

In [None]:
# Show the schema of the dataframe
airports.printSchema()

In [None]:
#Show the contents of the dataframe
airports.show(5) # By default shows the top 20 rows

### Manually provide the schema

In [None]:
# Set File Path
flightPerfFilePath = "flight-data/departuredelays.csv" # From https://catalog.data.gov/dataset/airline-on-time-performance-and-causes-of-flight-delays-on-time-data
# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true', inferSchema='false').cache()
print("File loaded!!!")

In [None]:
# Show the inferred schema
flightPerf.printSchema()
print("... which actually corresponds to: ",flightPerf.schema)

In [None]:
# Show the first row
flightPerf.first()

In [None]:
# Show the basic statistics of each column
flightPerf.describe().show()

#### Create a more accurate schema

In [None]:
from pyspark.sql.types import *

# The schema is encoded using StructType and using various pyspark.sql.types
newSchema = StructType([
    StructField("date", StringType(), False),    
    StructField("delay", IntegerType(), False),
    StructField("distance", IntegerType(), False),
    StructField("origin", StringType(), False),
    StructField("destination", StringType(), False)
])
print("New schema created!!!")

In [None]:
# Dataframes are immutable, so we have to define a new one
flightPerf.schema = newSchema

In [None]:
# First, we need to cast the two integer attributes
casted = flightPerf \
    .withColumn("delay", flightPerf["delay"].cast("Integer")) \
    .withColumn("distance", flightPerf["distance"].cast("Integer")) 
casted.printSchema()

In [None]:
%%time
# Avoid using collect, use RDD instead
flightPerfWithSchema = spark.createDataFrame(data=casted.collect(), schema=newSchema, verifySchema=True) 
flightPerfWithSchema.printSchema()

In [None]:
%%time
# Avoid using collect, use RDD instead
flightPerfWithSchema = spark.createDataFrame(data=casted.rdd, schema=newSchema, verifySchema=True)
flightPerfWithSchema.printSchema()

In [None]:
print("Without schema")
flightPerf.describe().show()
print("With schema")
flightPerfWithSchema = flightPerfWithSchema.cache()
flightPerfWithSchema.describe().show()

## From a Database table

In [None]:
# We can also create a Spark Dataframe from a pre-existing table through a JDBC connection
JDBCdf = spark.read \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://localhost:5432/<DATABASENAME>") \
    .option("dbtable", "<SCHEMA>.<TABLE>") \
    .option("user", "<USERNAME>") \
    .option("password", "<PASSWORD>") \
    .load()
# This is equivalent to
# JDBCdf = spark.read.jdbc(url="jdbc:postgresql://localhost:5432/<DATABASENAME>?user="<USERNAME>"&password="<PASSWORD>",table="<SCHEMA>.<TABLE>",properties={"driver": "org.postgresql.Driver"})
JDBCdf.show(10)

# Transformations

## withColumn

In [None]:
# We can derive new columns from existing ones
df.withColumn("derived",df[2]*2).show()

In [None]:
from pyspark.sql.functions import lit
# We can derive new columns
#df.withColumn("constant",666).show() # This does not work
df.withColumn("derived",df["B"]*2).withColumn("constant",lit(666)).show()

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
# Add a new column with a surrogate
dfWithKey = df.withColumn("Key",monotonically_increasing_id())
dfWithKey.show()

## select

In [None]:
from pyspark.sql.functions import col

#There are multiple ways to refer a column
df.select("A","B").show()
columns=("A","B")
df.select(*columns).show()
df.select(df.A,df.B).show()
df.select(df["A"],df["B"]).show()
df.select(col("A"),col("B")).show()

In [None]:
# We can select all the columns with the typical wild card
df.select("*").show()

In [None]:
from pyspark.sql.functions import struct

# We can merge different columns in a single one
# In this case, it must be a struct, because they are heterogenous
# If the columns are homogeneous, they can form an array or map
dfKeyValue = dfWithKey.select("Key",struct("A", "B", "C", "D").alias("Value"))
dfKeyValue.show()

In [None]:
#from pyspark.sql.functions import explode

# We can extract the elements of the struct one by one
# If it were an array or map, we could simply use "explode" to extract all at once
dfKeyValue.select((col("Value"))["A"].alias("A"), (col("Value"))["B"].alias("B"), (col("Value"))["C"].alias("C"), (col("Value"))["D"].alias("D")).show()

In [None]:
from pyspark.ml.feature import VectorAssembler

# We can merge different columns in an ML vector
# As soon as they are numerical and without NULL
VectorAssembler(inputCols=["B", "C"], outputCol="features").transform(df.select("B","C").filter("C IS NOT NULL")).show()

## filter/where

This way, we can refer to any row,  without the need of an explicit "index" column (i.e., we can use any)

In [None]:
df.filter("A='a'").show()
df.filter("B=1").show()
df.where("D AND C IS NULL").show()
df.dropna().show()

## sample

In [None]:
df.sample(withReplacement=False, fraction=0.4, seed=666).show()

## distinct/dropDuplicates

In [None]:
# Removes duplicate rows
df.select("D").distinct().show()

In [None]:
# Removes rows that have duplicate values in the given columns
df.dropDuplicates(["D"]).show()

## sort

In [None]:
from pyspark.sql.functions import asc, desc
df.sort("A").show()
df.sort(asc("D"), desc("B")).show()

## replace

In [None]:
# We can replace any value in any column, all at once
df.replace(to_replace=[3,4,5], value=[33,44,None], subset=["B"]).show()
print("Notice the original dataframe has not really changed!!!")
df.show()

In [None]:
# Does not work for NULL df.replace(to_replace=[None], value=[3.3], subset=["C"]).show()
df.fillna(3.3,"C").show()
# We can indicate a map for different imputations per column
df.fillna({"C": 3.3, "D": False}).show()

## groupBy

In [None]:
# You can obtain an object that allows to apply aggregation functions to it
# You cannot show or print this object (it is not a Dataframe)
print(df.count())
df.groupBy("D").count().show()
df.groupBy("D").max("B","C").show()

## agg

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

In [None]:
import pyspark.sql.functions as fn

df.groupBy("D").agg(fn.count("*")).show()
df.groupBy("D").agg(fn.count("D")).show() #Giving the column name, counts those not null
df.groupBy("D").agg(fn.max("B"), fn.max("C")).show()

In [None]:
# This is just a shorthand for df.groupBy().agg()
df.agg(
    fn.count('*').alias('count')
).show()

## crossJoin

In [None]:
df.select("A").crossJoin(df.select("B")).count()

## join

In [None]:
rendf = df.withColumnRenamed("A","RA") \
    .withColumnRenamed("B","RB") \
    .withColumnRenamed("C","RC") \
    .withColumnRenamed("D","RD")
df.join(rendf, [df.A==rendf.RA, df.C==rendf.RC]).show() # What happened with row number 3?

In [None]:
# Let's take bigger Dataframes
print("Airports(Spark): ", airports.count())
print("Flights(Spark): ", flightPerfWithSchema.count())

In [None]:
%%time
# Join the two big Dataframes
flightPerfWithSchema.join(airports, [flightPerfWithSchema.origin==airports.IATA]).count()

In [None]:
%%time
# Let's do the join as an operation derived from cross product and filter
crossed = flightPerfWithSchema.crossJoin(airports)
crossed.where(crossed.origin==crossed.IATA).count()

In [None]:
# Transform the Spark Dataframes into Pandas
p_airports = airports.toPandas()
p_flightPerfWithSchema = flightPerfWithSchema.toPandas()
print("Airports(Pandas): ", len(p_airports))
print("Flights(Pandas): ", len(p_flightPerfWithSchema))

In [None]:
%%time
# Let's take again the join as an operation derived from cross product and filter on Pandas Dataframes
# THIS CAN TAKE A WHILE!!!
crossed = p_flightPerfWithSchema.join(p_airports, how="cross")
len(crossed[crossed["origin"]==crossed["IATA"]])

In [None]:
%%time
# Let's do only the cross product on the Spark Dataframe
crossed = flightPerfWithSchema.crossJoin(airports)
crossed.count()

## repartition/coalesce

In [None]:
%%time
print("Number of Partitions: ", flightPerfWithSchema.rdd.getNumPartitions())
rep = flightPerfWithSchema.repartition(1)
print("Number of Partitions: ", rep.rdd.getNumPartitions())

In [None]:
%%time
print("Number of Partitions: ", flightPerfWithSchema.rdd.getNumPartitions())
coa = flightPerfWithSchema.coalesce(1)
print("Number of Partitions: ", coa.rdd.getNumPartitions())

## Transformations on RDD

### map
For each element in the RDD returns exactly another one

In [None]:
df.rdd.map(lambda r: r["A"]).collect()

In [None]:
df.rdd \
    .map(lambda r: (r["A"],r["B"]-1,r["B"]*2,r["D"]) if r[3] else (None,"X","Y","Z")) \
    .collect()

In [None]:
df.rdd \
    .map(lambda r: (r["A"],r["B"]-1,r["B"]*2,r["D"]) if r[3] else None) \
    .collect()

### flatmap
Returns a (potentially empty) list of elements of the RDD

In [None]:
df.rdd \
    .flatMap(lambda r: [r["A"],r["A"]] if r["D"] else ()) \
    .collect()

In [None]:
df.rdd \
    .flatMap(lambda r: [(r["A"],r["B"]-1), (r["B"]*2,False)] if r[3] else ()) \
    .collect()

### reduce
Actually, this is an action (not a transformation)

In [None]:
df.rdd \
    .map(lambda r: r["B"]) \
    .reduce(lambda a,b: a+b)

# SQL

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/catalog.html

## Link the tables

In [None]:
# Show the characteristics of the default database
print(spark.catalog.listDatabases()[0])

In [None]:
# Describe the characteristics of the database
spark.sql("DESCRIBE DATABASE EXTENDED default;").show(truncate=True)

In [None]:
# Create a temporary table in the spark session default database for each one of the dataframes
# This allows to access them as any other Relational table using SQL
airports.createOrReplaceTempView("Airports")
flightPerf.createOrReplaceTempView("FlightPerformance")
print("Temporary views created!!!")

In [None]:
# List existing table names 
for t in spark.catalog.listTables():
    print(t.tableType," ",t.name)

## Execute queries using standard SQL

In [None]:
result = spark.sql("""
SELECT City
FROM Airports
LIMIT 5;
""")
result.printSchema()
result.show()

In [None]:
# Query Sum of Flight Delays by City and Origin Code (for Washington State)
spark.sql("""
SELECT a.City, f.origin, sum(f.delay) as Delays 
FROM FlightPerformance f 
    JOIN airports a 
      ON a.IATA = f.origin 
WHERE a.State = 'WA' 
GROUP BY a.City, f.origin 
ORDER BY Delays desc;
""").show()

In [None]:
# This does not work, only queries are allowed
spark.sql("""
INSERT INTO airports(City, State, Country, IATA) VALUES ('A','B','C','D');
""")