### Part 1: Setup

In [1]:
import findspark

In [2]:
findspark.find()
findspark.init()

In [5]:
import pyspark
from pyspark.sql import SparkSession

### Part 2: Starting the Session

In [4]:
# Start a Spark Session - this does require a number of environment variables to be set correctly: JAVA_HOME, SPARK_HOME, HADOOP_HOME
spark = SparkSession.builder.appName("Practice Session").getOrCreate()

NameError: name 'SparkSession' is not defined

In [5]:
# Check Spark session info
spark

In [6]:
# Read in Spark Data. The option method allows us to import the csv with first row as column names
dfSparkData = spark.read.csv("IronMan2023.csv", header=True, inferSchema=True)

In [17]:
# Gives a visual look at the data frame (2 Ways)
dfSparkData.show()

dfSparkData.createOrReplaceTempView('ironman')
spark.sql("SELECT * FROM ironman").show()

+----+------------------+-------------+-----------+------------+------+--------+------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+-----------------+-------------------+-------------------+
|Year|  Participant Name|Division Rank|Gender Rank|Overall Rank|   Bib|Division|Points|          Swim Time|          Bike Time|           Run Time|       Overall Time|Swim Division Rank|Bike Division Rank|Run Division Rank|            T1 Time|            T2 Time|
+----+------------------+-------------+-----------+------------+------+--------+------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+-----------------+-------------------+-------------------+
|2023|       Sam Laidlow|            1|          1|           1|POINTS|       3|  MPRO|2023-10-01 00:47:50|2023-10-01 04:31:28|2023-10-01 02:41:46|2023-10-01 08:06:22|                 5|                 1|   

### Part 3: Playing with the DataFrame

In [18]:
# Shows us the data frame's schema
dfSparkData.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Participant Name: string (nullable = true)
 |-- Division Rank: integer (nullable = true)
 |-- Gender Rank: integer (nullable = true)
 |-- Overall Rank: integer (nullable = true)
 |-- Bib: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- Points: string (nullable = true)
 |-- Swim Time: timestamp (nullable = true)
 |-- Bike Time: timestamp (nullable = true)
 |-- Run Time: timestamp (nullable = true)
 |-- Overall Time: timestamp (nullable = true)
 |-- Swim Division Rank: integer (nullable = true)
 |-- Bike Division Rank: integer (nullable = true)
 |-- Run Division Rank: integer (nullable = true)
 |-- T1 Time: timestamp (nullable = true)
 |-- T2 Time: timestamp (nullable = true)



In [19]:
# Check specific data frame type
type(dfSparkData)

pyspark.sql.dataframe.DataFrame

In [31]:
# Display columns
dfSparkData.columns

['Year',
 'Participant Name',
 'Division Rank',
 'Gender Rank',
 'Overall Rank',
 'Bib',
 'Division',
 'Points',
 'Swim Time',
 'Bike Time',
 'Run Time',
 'Overall Time',
 'Swim Division Rank',
 'Bike Division Rank',
 'Run Division Rank',
 'T1 Time',
 'T2 Time']

In [21]:
# How to select columns to use with other methods. Can feed a single value or a list
dfSparkData.select(['Participant Name', 'Overall Rank']).show()

+------------------+------------+
|  Participant Name|Overall Rank|
+------------------+------------+
|       Sam Laidlow|           1|
|     Patrick Lange|           2|
|     Magnus Ditlev|           3|
|     Rudy Von Berg|           4|
|    Leon Chevalier|           5|
|    Arthur Horseau|           6|
|     Bradley Weiss|           7|
|   Gregory Barnaby|           8|
|Robert Wilkowiecki|           9|
|    Clement Mignon|          10|
| Matthew Marquardt|          11|
|   Arnaud Guilloux|          12|
|     Benjamin Hill|          13|
|      Cameron Wurf|          14|
|     Niek Heldoorn|          15|
|     Braden Currie|          16|
|  Mathias Petersen|          17|
|        Remi Conte|          18|
|    Leonard Arnold|          19|
|    Jonas Hoffmann|          20|
+------------------+------------+
only showing top 20 rows



In [13]:
# If you want to save the results of a query, use .collect() instead of .show()
result = dfSparkData.filter(dfSparkData['Participant Name'] == "Sam Laidlow").collect()
print(result)

[Row(Year=2023, Participant Name='Sam Laidlow', Division Rank=1, Gender Rank=1, Overall Rank=1, Bib='POINTS', Division='3', Points='MPRO', Swim Time=datetime.datetime(2023, 9, 30, 0, 47, 50), Bike Time=datetime.datetime(2023, 9, 30, 4, 31, 28), Run Time=datetime.datetime(2023, 9, 30, 2, 41, 46), Overall Time=datetime.datetime(2023, 9, 30, 8, 6, 22), Swim Division Rank=5, Bike Division Rank=1, Run Division Rank=1, T1 Time=datetime.datetime(2023, 9, 30, 0, 3, 6), T2 Time=datetime.datetime(2023, 9, 30, 0, 2, 12))]


In [22]:
# Can further save results of Row objects as dictionaries to make accessing them easier
rowObject = result[0]
rowObject.asDict()


{'Year': 2023,
 'Participant Name': 'Sam Laidlow',
 'Division Rank': 1,
 'Gender Rank': 1,
 'Overall Rank': 1,
 'Bib': 'POINTS',
 'Division': '3',
 'Points': 'MPRO',
 'Swim Time': datetime.datetime(2023, 9, 30, 0, 47, 50),
 'Bike Time': datetime.datetime(2023, 9, 30, 4, 31, 28),
 'Run Time': datetime.datetime(2023, 9, 30, 2, 41, 46),
 'Overall Time': datetime.datetime(2023, 9, 30, 8, 6, 22),
 'Swim Division Rank': 5,
 'Bike Division Rank': 1,
 'Run Division Rank': 1,
 'T1 Time': datetime.datetime(2023, 9, 30, 0, 3, 6),
 'T2 Time': datetime.datetime(2023, 9, 30, 0, 2, 12)}

In [39]:
# Describe the Data
dfSparkData.describe().show()

+-------+------+----------------+------------------+-----------------+-----------------+--------+------------------+------+---------+---------+--------+------------+------------------+------------------+------------------+--------+--------+
|summary|  Year|Participant Name|     Division Rank|      Gender Rank|     Overall Rank|     Bib|          Division|Points|Swim Time|Bike Time|Run Time|Overall Time|Swim Division Rank|Bike Division Rank| Run Division Rank| T1 Time| T2 Time|
+-------+------+----------------+------------------+-----------------+-----------------+--------+------------------+------+---------+---------+--------+------------+------------------+------------------+------------------+--------+--------+
|  count|  1950|            1950|              1950|             1950|             1950|    1950|              1950|  1950|     1950|     1950|    1950|        1950|              1950|              1950|              1950|    1950|    1950|
|   mean|2023.0|            null|117

In [45]:
# Add columns. Give it a column name and a condition. Does require assignment to save results of the add
dfSparkData = dfSparkData.withColumn('Even Rank', dfSparkData['Overall Rank'] % 2 == 0)

In [50]:
# Dropping columns
dfSparkData = dfSparkData.drop('Even Rank')

In [53]:
# Renaming Columns
dfSparkData = dfSparkData.withColumnRenamed('Points?', 'Bib')

### Part 4: Cleaning the Data

In [57]:
# Drop rows with Null Values. Optional arguments: How, Threshold, Subset
dfSparkData.na.drop()

DataFrame[Year: string, Participant Name: string, Division Rank: string, Gender Rank: string, Overall Rank: string, Bib: string, Division: string, Points: string, Swim Time: string, Bike Time: string, Run Time: string, Overall Time: string, Swim Division Rank: string, Bike Division Rank: string, Run Division Rank: string, T1 Time: string, T2 Time: string]

In [59]:
# Fill in missing values. Arguments: Value, Subset
dfSparkData.na.fill('Missing Value')

DataFrame[Year: string, Participant Name: string, Division Rank: string, Gender Rank: string, Overall Rank: string, Bib: string, Division: string, Points: string, Swim Time: string, Bike Time: string, Run Time: string, Overall Time: string, Swim Division Rank: string, Bike Division Rank: string, Run Division Rank: string, T1 Time: string, T2 Time: string]

In [23]:
# Possible other way to fill in missing values: an imputer function. Imputed columns must be numeric. Imputer is a machine
# learning library in PySpark that can impute missing values based off some rule
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=["Overall Rank", "Division Rank", "Gender Rank"], 
                  outputCols=["{}_imputed".format(c) 
                              for c in ["Overall Rank", "Division Rank", "Gender Rank"]]).setStrategy("mean")
imputer.fit(dfSparkData).transform(dfSparkData).show()

+----+------------------+-------------+-----------+------------+------+--------+------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+-----------------+-------------------+-------------------+--------------------+---------------------+-------------------+
|Year|  Participant Name|Division Rank|Gender Rank|Overall Rank|   Bib|Division|Points|          Swim Time|          Bike Time|           Run Time|       Overall Time|Swim Division Rank|Bike Division Rank|Run Division Rank|            T1 Time|            T2 Time|Overall Rank_imputed|Division Rank_imputed|Gender Rank_imputed|
+----+------------------+-------------+-----------+------------+------+--------+------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+-----------------+-------------------+-------------------+--------------------+---------------------+-------------------+
|2023|       Sam La

### Part 5: Filtering Operations

In [71]:
# Filter people based on some condition, first format
dfSparkData.filter("Bib == 'POINTS'").show()

#  Alternatively, second format
dfSparkData.filter(dfSparkData['Overall Rank'] == '50').show()

+----+------------------+-------------+-----------+------------+------+--------+------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+-----------------+-------------------+-------------------+
|Year|  Participant Name|Division Rank|Gender Rank|Overall Rank|   Bib|Division|Points|          Swim Time|          Bike Time|           Run Time|       Overall Time|Swim Division Rank|Bike Division Rank|Run Division Rank|            T1 Time|            T2 Time|
+----+------------------+-------------+-----------+------------+------+--------+------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+-----------------+-------------------+-------------------+
|2023|       Sam Laidlow|            1|          1|           1|POINTS|       3|  MPRO|2023-09-26 00:47:50|2023-09-26 04:31:28|2023-09-26 02:41:46|2023-09-26 08:06:22|                 5|                 1|   

### Part 6: GroupBy and Aggregate Functions

In [80]:
# GroupBy Operations. This creates a "group" object, which can be called or used as an input for methods
dfSparkData.groupBy('Participant Name')

# Aggregates can be attached at the end of the groupBy method
dfSparkData.groupBy('Points').count().show()

+------+-----+
|Points|count|
+------+-----+
|POINTS|    1|
|M25-29|  124|
|M50-54|  290|
|M45-49|  282|
|  MPRO|   32|
|M30-34|  223|
|M35-39|  266|
|    HC|    1|
|M70-74|    4|
|M40-44|  315|
|M75-79|    1|
|M65-69|   36|
|M55-59|  204|
|M60-64|  104|
|M18-24|   67|
+------+-----+



### Part 7: Prediction
    

### Part 8: Streaming Data

In [14]:
# Transforming the data (in this case, splitting lines of text into words)
outputWords = inputSources.select(split(col("value"), "\\s").alias("word"))
wordCount = outputWords.groupBy("word").count()