### Spark fundamentals

In [2]:
# Import pyspark(Spark's python wrapper)
from pyspark.sql import SparkSession

In [3]:
# Create spark session
spark = SparkSession.builder.getOrCreate()

In [4]:
# Define file path
file_path = "./data/people.csv"

# Load data from a csv file
df = spark.read.csv(file_path, header=True)

# Preview data
df.show()

+-----+---------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
|Index|Organization Id|                Name|             Website|             Country|         Description|Founded|            Industry|Number of employees|
+-----+---------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
|    1|E84A904909dF528|          Liu-Hoover|http://www.day-ha...|      Western Sahara|Ergonomic zero ad...|   1980|   Online Publishing|               6851|
|    2|AAC4f9aBF86EAeF|       Orr-Armstrong|https://www.chapm...|             Algeria|Ergonomic radical...|   1970|     Import / Export|               7994|
|    3|ad2eb3C8C24DB87|           Gill-Lamb|     http://lin.com/|       Cote d'Ivoire|Programmable inte...|   2005|   Apparel / Fashion|               5105|
|    4|D76BB12E5eE165B|         Bauer-Weiss|https://gilles

In [5]:
# Define table name
table_name = "organizations"

# Create an SQL table 
df.createOrReplaceTempView(table_name)

In [6]:
# Define query
query = f"SELECT * FROM {table_name}"

# Query the table
spark.sql(query).show()

+-----+---------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
|Index|Organization Id|                Name|             Website|             Country|         Description|Founded|            Industry|Number of employees|
+-----+---------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
|    1|E84A904909dF528|          Liu-Hoover|http://www.day-ha...|      Western Sahara|Ergonomic zero ad...|   1980|   Online Publishing|               6851|
|    2|AAC4f9aBF86EAeF|       Orr-Armstrong|https://www.chapm...|             Algeria|Ergonomic radical...|   1970|     Import / Export|               7994|
|    3|ad2eb3C8C24DB87|           Gill-Lamb|     http://lin.com/|       Cote d'Ivoire|Programmable inte...|   2005|   Apparel / Fashion|               5105|
|    4|D76BB12E5eE165B|         Bauer-Weiss|https://gilles

In [6]:
# Inspect table schema
result = spark.sql(f"SHOW COLUMNS FROM {table_name}")

result.show()

+-------------------+
|           col_name|
+-------------------+
|              Index|
|    Organization Id|
|               Name|
|            Website|
|            Country|
|        Description|
|            Founded|
|           Industry|
|Number of employees|
+-------------------+



In [7]:
# Inspect table schema
result = spark.sql(f"SELECT * FROM {table_name} LIMIT 0")

print(result.columns)

['Index', 'Organization Id', 'Name', 'Website', 'Country', 'Description', 'Founded', 'Industry', 'Number of employees']


In [8]:
# Inspect table schema
result = spark.sql(f"DESCRIBE {table_name}")

result.show()

+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|              Index|   string|   null|
|    Organization Id|   string|   null|
|               Name|   string|   null|
|            Website|   string|   null|
|            Country|   string|   null|
|        Description|   string|   null|
|            Founded|   string|   null|
|           Industry|   string|   null|
|Number of employees|   string|   null|
+-------------------+---------+-------+



### Window Function SQL
* OVER and ORDERBY clauses

In [7]:
# Train schedule dataset
df = spark.read.csv("./data/train_schedule.csv", header=True)
df.createOrReplaceTempView("sched")

In [8]:
spark.sql("SELECT * FROM sched ORDER BY time").show()

+--------+--------------------+--------+
|train_id|             station|    time|
+--------+--------------------+--------+
|     110|      Sydney Central|01:05 PM|
|     104|        Gare du Nord|01:20 PM|
|     107|       Madrid Atocha|01:50 PM|
|     115|   Los Angeles Union|01:50 PM|
|     102|         Kings Cross|02:00 PM|
|     113|        Cairo Ramses|02:35 PM|
|     110|   Singapore Central|03:20 PM|
|     105|       Tokyo Station|03:35 PM|
|     108| Berlin Hauptbahnhof|04:05 PM|
|     102|    Shinjuku Station|04:20 PM|
|     113| Buenos Aires Retiro|04:50 PM|
|     111|Mexico City Terminal|05:35 PM|
|     105|   Hong Kong Station|05:50 PM|
|     108|  Amsterdam Centraal|06:20 PM|
|     103|            Waterloo|06:35 PM|
|     106|    Liverpool Street|07:05 AM|
|     114|Toronto Union Sta...|07:05 AM|
|     111|     Moscow Kazansky|07:50 AM|
|     101|       Grand Central|08:15 AM|
|     109|        Rome Termini|08:35 AM|
+--------+--------------------+--------+
only showing top

In [9]:
query = """
SELECT train_id, station, time,
LEAD(time, 1) OVER (ORDER BY time) AS time_next
FROM sched
WHERE train_id=101
"""

spark.sql(query).show()

24/01/25 11:23:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/25 11:23:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/25 11:23:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/25 11:23:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/25 11:23:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+--------+-------------+--------+---------+
|train_id|      station|    time|time_next|
+--------+-------------+--------+---------+
|     101|Grand Central|08:15 AM| 10:30 AM|

In [10]:
# Using the PARTITION BY clause in order to improve performance
query = """
SELECT train_id, station, time,
LEAD(time, 1) OVER (PARTITION BY train_id ORDER BY time) AS time_next
FROM sched
"""

spark.sql(query).show()

+--------+-------------------+--------+---------+
|train_id|            station|    time|time_next|
+--------+-------------------+--------+---------+
|     101|      Grand Central|08:15 AM| 10:30 AM|
|     101|       Penn Station|10:30 AM| 12:45 PM|
|     101|      Union Station|12:45 PM|     null|
|     102|        Kings Cross|02:00 PM| 04:20 PM|
|     102|   Shinjuku Station|04:20 PM|     null|
|     103|           Waterloo|06:35 PM| 08:50 AM|
|     103|   Victoria Station|08:50 AM|     null|
|     104|       Gare du Nord|01:20 PM| 11:05 AM|
|     104|       King's Cross|11:05 AM|     null|
|     105|      Tokyo Station|03:35 PM| 05:50 PM|
|     105|  Hong Kong Station|05:50 PM|     null|
|     106|   Liverpool Street|07:05 AM| 09:20 AM|
|     106|      Beijing South|09:20 AM|     null|
|     107|      Madrid Atocha|01:50 PM| 11:35 AM|
|     107|      Seoul Station|11:35 AM|     null|
|     108|Berlin Hauptbahnhof|04:05 PM| 06:20 PM|
|     108| Amsterdam Centraal|06:20 PM|     null|


In [11]:
#The following are example queries
query = """
SELECT 
ROW_NUMBER() OVER (ORDER BY time) AS row,
train_id, 
station, 
time, 
LEAD(time,1) OVER (ORDER BY time) AS time_next 
FROM schedule
"""

# Updated query -> Query did not include PARTITION BY clause as well as bad_row number
query = """
SELECT 
ROW_NUMBER() OVER (ORDER BY time) AS row,
train_id, 
station, 
time, 
LEAD(time,1) OVER (PARTITION BY train_id ORDER BY time) AS time_next 
FROM schedule
"""
spark.sql(query).show()

# Give the number of the bad row as an integer
bad_row = 7

# Provide the missing clause, SQL keywords in upper case
clause = 'PARTITION BY train_id'

24/01/25 11:23:59 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/01/25 11:23:59 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/01/25 11:23:59 WARN Hive: Failed to register all functions.
java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
	at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1709)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:83)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3662)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3714)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3694)
	at org.apache.hadoop.hive.ql.me

AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

In [16]:
# Ways to select 2 columns
df.columns

['train_id', 'station', 'time']

In [12]:
df.show(5)

+--------+----------------+--------+
|train_id|         station|    time|
+--------+----------------+--------+
|     101|   Grand Central|08:15 AM|
|     101|    Penn Station|10:30 AM|
|     101|   Union Station|12:45 PM|
|     102|     Kings Cross|02:00 PM|
|     102|Shinjuku Station|04:20 PM|
+--------+----------------+--------+
only showing top 5 rows



In [19]:
# Show only 5 records and 2 columns
df.select('train_id', 'station').show(5)

+--------+----------------+
|train_id|         station|
+--------+----------------+
|     101|   Grand Central|
|     101|    Penn Station|
|     101|   Union Station|
|     102|     Kings Cross|
|     102|Shinjuku Station|
+--------+----------------+
only showing top 5 rows



In [21]:
# The same can be achieved using <dot > notation
df.select(df.train_id, df.station).show(5)

+--------+----------------+
|train_id|         station|
+--------+----------------+
|     101|   Grand Central|
|     101|    Penn Station|
|     101|   Union Station|
|     102|     Kings Cross|
|     102|Shinjuku Station|
+--------+----------------+
only showing top 5 rows



In [15]:
# The <col> function can also be imported
# This enables passing in column names as strings
from pyspark.sql.functions import col

In [23]:
df.select(col('train_id'), col('station')).show(5)

+--------+----------------+
|train_id|         station|
+--------+----------------+
|     101|   Grand Central|
|     101|    Penn Station|
|     101|   Union Station|
|     102|     Kings Cross|
|     102|Shinjuku Station|
+--------+----------------+
only showing top 5 rows



In [30]:
# Using the <withColumnRenamed> function
df.select('train_id', 'station').withColumnRenamed('train_id', 'train').show(5)

+-----+----------------+
|train|         station|
+-----+----------------+
|  101|   Grand Central|
|  101|    Penn Station|
|  101|   Union Station|
|  102|     Kings Cross|
|  102|Shinjuku Station|
+-----+----------------+
only showing top 5 rows



* Note -> Avoid using all 3 conventions at the same time

In [13]:
table_name = 'sched'
target_col = 'train_id'
sup_col = 'station'

spark.sql(f'SELECT {target_col} AS train, {sup_col} FROM {table_name} LIMIT 5').show()

+-----+----------------+
|train|         station|
+-----+----------------+
|  101|   Grand Central|
|  101|    Penn Station|
|  101|   Union Station|
|  102|     Kings Cross|
|  102|Shinjuku Station|
+-----+----------------+



In [18]:
# Using dot notation to achieve the same results
df.select(col('train_id').alias('train'), 'station').limit(5).show()

+-----+----------------+
|train|         station|
+-----+----------------+
|  101|   Grand Central|
|  101|    Penn Station|
|  101|   Union Station|
|  102|     Kings Cross|
|  102|Shinjuku Station|
+-----+----------------+



In [22]:
# Using Window Functions to achieve the same results

# The following query adds a number to each stop on a train line -- in a new column called id 
query = """
SELECT *,
ROW_NUMBER() OVER(PARTITION BY train_id ORDER BY time) AS id
FROM sched
"""

spark.sql(query).show(11)


+--------+-----------------+--------+---+
|train_id|          station|    time| id|
+--------+-----------------+--------+---+
|     101|    Grand Central|08:15 AM|  1|
|     101|     Penn Station|10:30 AM|  2|
|     101|    Union Station|12:45 PM|  3|
|     102|      Kings Cross|02:00 PM|  1|
|     102| Shinjuku Station|04:20 PM|  2|
|     103|         Waterloo|06:35 PM|  1|
|     103| Victoria Station|08:50 AM|  2|
|     104|     Gare du Nord|01:20 PM|  1|
|     104|     King's Cross|11:05 AM|  2|
|     105|    Tokyo Station|03:35 PM|  1|
|     105|Hong Kong Station|05:50 PM|  2|
+--------+-----------------+--------+---+
only showing top 11 rows



In [27]:
# Using Window Functions to achieve the same results with dot notation
from pyspark.sql import Window
from pyspark.sql.functions import row_number

df.withColumn("id", row_number().over(Window.partitionBy('train_id').orderBy('time'))).show(11)

+--------+-----------------+--------+---+
|train_id|          station|    time| id|
+--------+-----------------+--------+---+
|     101|    Grand Central|08:15 AM|  1|
|     101|     Penn Station|10:30 AM|  2|
|     101|    Union Station|12:45 PM|  3|
|     102|      Kings Cross|02:00 PM|  1|
|     102| Shinjuku Station|04:20 PM|  2|
|     103|         Waterloo|06:35 PM|  1|
|     103| Victoria Station|08:50 AM|  2|
|     104|     Gare du Nord|01:20 PM|  1|
|     104|     King's Cross|11:05 AM|  2|
|     105|    Tokyo Station|03:35 PM|  1|
|     105|Hong Kong Station|05:50 PM|  2|
+--------+-----------------+--------+---+
only showing top 11 rows



In [28]:
# Consider the following examples
# Give the identical result in each command
spark.sql('SELECT train_id, MIN(time) AS start FROM sched GROUP BY train_id').show()
df.groupBy('train_id').agg({'time':'min'}).withColumnRenamed('min(time)', 'start').show()

# Print the second column of the result
spark.sql('SELECT train_id, MIN(time), MAX(time) FROM sched GROUP BY train_id').show()
result = df.groupBy('train_id').agg({'time':'min', 'time':'max'})
result.show()
print(result.columns[1])

+--------+--------+
|train_id|   start|
+--------+--------+
|     101|08:15 AM|
|     102|02:00 PM|
|     103|06:35 PM|
|     104|01:20 PM|
|     105|03:35 PM|
|     106|07:05 AM|
|     107|01:50 PM|
|     108|04:05 PM|
|     109|08:35 AM|
|     110|01:05 PM|
|     111|05:35 PM|
|     112|10:05 AM|
|     113|02:35 PM|
|     114|07:05 AM|
|     115|01:50 PM|
+--------+--------+

+--------+--------+
|train_id|   start|
+--------+--------+
|     101|08:15 AM|
|     102|02:00 PM|
|     103|06:35 PM|
|     104|01:20 PM|
|     105|03:35 PM|
|     106|07:05 AM|
|     107|01:50 PM|
|     108|04:05 PM|
|     109|08:35 AM|
|     110|01:05 PM|
|     111|05:35 PM|
|     112|10:05 AM|
|     113|02:35 PM|
|     114|07:05 AM|
|     115|01:50 PM|
+--------+--------+

+--------+---------+---------+
|train_id|min(time)|max(time)|
+--------+---------+---------+
|     101| 08:15 AM| 12:45 PM|
|     102| 02:00 PM| 04:20 PM|
|     103| 06:35 PM| 08:50 AM|
|     104| 01:20 PM| 11:05 AM|
|     105| 03:35 PM| 

In [30]:
# Aggregating the same column twice

# There are cases where dot notation can be more cumbersome than SQL. 
# This sample code calculates the first and last times for each train line. 
# The following code does this using dot notation.

from pyspark.sql.functions import min, max, col
expr = [min(col("time")).alias('start'), max(col("time")).alias('end')]
dot_df = df.groupBy("train_id").agg(*expr)
dot_df.show(5)

+--------+--------+--------+
|train_id|   start|     end|
+--------+--------+--------+
|     101|08:15 AM|12:45 PM|
|     102|02:00 PM|04:20 PM|
|     103|06:35 PM|08:50 AM|
|     104|01:20 PM|11:05 AM|
|     105|03:35 PM|05:50 PM|
+--------+--------+--------+
only showing top 5 rows

