<a href="https://colab.research.google.com/github/mokab8308-netizen/us-ie-big-data-technologies/blob/master/pyspark/simple-pandas-pyspark-agg.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Exploring Spark with Pandas


Using pandas examples, convert the analysis to pyspark. This is useful if you discover your data grows too large for your tooling.

The purpose of this notebook is to familiarise yourself you the pyspark API. You are welcome to use the R version of this if you wish. As long as you are able to obtain the correct results. We will be using python in this notebook as it is quite widely used through data science and the community is very large.



#### Firstly, let's get our spark session

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('panda-and-spark').getOrCreate()

### Overview


* Joining two dataframes/data sets
* Simple aggregations
* Persisting

#### JOIN: Pandas

We won't use this more in this notebook, but observe how the joins work.

We what happens if you change from the default inner join to outer joins.

In [None]:
customer_raw = [(1, 'bob', 3462543658686),
           (2, 'rob', 9087567565439),
           (3, 'tim', 5436586999467),
           (4, 'tom', 8349756853250)]

customer_cols = ['id', 'name', 'credit_card_number']



orders_raw = [(1, 'ketchup', 'bob', 1.20),
           (2, 'rutabaga', 'bob', 3.35),
           (3, 'fake vegan meat', 'rob', 13.99),
           (4, 'cheesey poofs', 'tim', 3.99),
           (5, 'ice cream', 'tim', 4.95),
           (6, 'protein powder', 'tom', 49.95)]

orders_cols = ['id', 'product_name', 'customer', 'price']

In [None]:
customer_df = pd.DataFrame(customer_raw, columns=customer_cols)
orders_df = pd.DataFrame(orders_raw, columns=orders_cols)

customer_df

joined_df = pd.merge(customer_df, orders_df, how='inner', left_on='name', right_on='customer')
joined_df

## For self study. What happens if (4, 'tom', 8349756853250) in valuesA becomes (4, 'tod', 8349756853250)
## How do the results change?
## More sensibly; what if customers have not made any orders but we still require them in the result set?

#### JOIN: Spark

In [None]:
customersDF = spark.createDataFrame(customer_raw, customer_cols)

ordersDF = spark.createDataFrame(orders_raw, orders_cols)

# Show tables
customersDF.show()
ordersDF.show()

In [None]:
joinedDF = customersDF.join(ordersDF, customersDF.name == ordersDF.customer)
joinedDF.show()

## Simple Aggregations

Now let's explore simple aggregations. You will be using these often when doing exploratory work in big data. Remember, the intention here is that you grow familiar with the way the API works, and how to translate inquiries into that API.

> _How much did each person spend?_

In [None]:
joined_df.groupby('name').agg({"price": ["sum"]})

In [None]:
import pyspark.sql.functions as f

joinedDF.groupby('name').agg(f.sum('price').alias('total')).show()

Let's use bigger data
  * NYC crash data

In [12]:
# save to the filesystem to prevent another load
! curl https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv -o rows.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  443M    0  443M    0     0  3395k      0 --:--:--  0:02:13 --:--:-- 2064k




*   List item
*   List item



In [13]:
import pandas as pd
nyc_df = pd.read_csv('rows.csv')

  nyc_df = pd.read_csv('rows.csv')


In [14]:
# number or rows

print(len(nyc_df))

# this is quite large so we will work with a sample while we experiment in pandas as least.

2205618


We'll take a random sample at 20% of the original data

In [15]:
nyc_small = nyc_df.sample(frac=0.2, replace=False, random_state=1)

In [16]:
# we are also going to limit the columns to those we are going to work with

nyc_small = nyc_small[['CRASH DATE', 'CONTRIBUTING FACTOR VEHICLE 1',
                       'BOROUGH', 'VEHICLE TYPE CODE 1',
                       'NUMBER OF PERSONS INJURED']]

In [17]:
nyc_small.head(2)

Unnamed: 0,CRASH DATE,CONTRIBUTING FACTOR VEHICLE 1,BOROUGH,VEHICLE TYPE CODE 1,NUMBER OF PERSONS INJURED
1909713,06/24/2013,Driver Inattention/Distraction,BROOKLYN,PASSENGER VEHICLE,0.0
1253713,06/28/2016,Unsafe Lane Changing,,4 dr sedan,0.0


Now, let's create the pyspark dataframe. Now we two frames with the same content
  * nyc_small: pandas
  * sdf_small: pyspark

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# Create SparkSession if it doesn't exist
spark = SparkSession.builder.appName('panda-and-spark').getOrCreate()

# there are nan's in the frame with strings, and spark can't 'infer the schema', so we have to help it out
# by replacing them with empty strings and forcing the column to be a string

sdf_small = SQLContext(spark).createDataFrame(nyc_small.fillna('').astype('str'))


# Lets check the schema quickly

print(sdf_small.schema)



StructType([StructField('CRASH DATE', StringType(), True), StructField('CONTRIBUTING FACTOR VEHICLE 1', StringType(), True), StructField('BOROUGH', StringType(), True), StructField('VEHICLE TYPE CODE 1', StringType(), True), StructField('NUMBER OF PERSONS INJURED', StringType(), True)])


# Questions

Answer the following questions by porting the pandas code to the Spark API



# Question 1


> On what day do most crashes occcur?

In [21]:
# Pandas
nyc_small.groupby('CRASH DATE')['CRASH DATE'].count().sort_values(ascending=False).head(5)

Unnamed: 0_level_0,CRASH DATE
CRASH DATE,Unnamed: 1_level_1
01/21/2014,261
12/15/2017,229
11/15/2018,213
05/19/2017,211
01/18/2015,198


In [None]:
## Spark?
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

def main():
    """
    Load rows.csv and print the top five dates with the most crashes.
    """
    spark = SparkSession.builder.appName("TopCrashDates").getOrCreate()

    print("--- Loading data from rows.csv ---")
    try:
        df = (
            spark.read
                 .option("header", True)
                 .option("inferSchema", True)
                 .csv("rows.csv")
        )
    except Exception as e:
        print("Error: Could not load 'rows.csv'.")
        print(f"Details: {e}")
        spark.stop()
        return

    # --- Find the crash-date column (handles both 'CRASH DATE' and 'CRASH_DATE')
    crash_date_col = "CRASH DATE" if "CRASH DATE" in df.columns else \
                     "CRASH_DATE" if "CRASH_DATE" in df.columns else None
    if crash_date_col is None:
        print("Error: No 'CRASH DATE' or 'CRASH_DATE' column found.")
        spark.stop()
        return

    # --- Ensure it's a proper date (dataset uses MM/dd/yyyy)
    # Use df[crash_date_col] so names with spaces work.
    df2 = (
        df.filter(df[crash_date_col].isNotNull())
          .withColumn("CRASH_DATE",
                      F.to_date(df[crash_date_col], "MM/dd/yyyy"))
    )

    # If parsing failed (already a date type in some files), coalesce to original cast
    df2 = df2.withColumn(
        "CRASH_DATE",
        F.coalesce(F.col("CRASH_DATE"), F.to_date(df[crash_date_col]))
    )

    # --- Group, count, sort, and take top 5
    result_df = (
        df2.groupBy("CRASH_DATE")
           .count()
           .orderBy(F.desc("count"))
           .limit(5)
    )

    print("\n--- Top 5 dates with most crashes ---")
    result_df.show(truncate=False)

    spark.stop()
    print("\n--- Spark session stopped ---")

if __name__ == "__main__":
    main()



--- Loading data from rows.csv ---

--- Top 5 dates with most crashes ---
+----------+-----+
|CRASH_DATE|count|
+----------+-----+
|2014-01-21|1161 |
|2018-11-15|1065 |
|2017-12-15|999  |
|2017-05-19|974  |
|2015-01-18|961  |
+----------+-----+


--- Spark session stopped ---


# Question 2

> _Where do most crashes occur?_

In [34]:
nyc_small.groupby('BOROUGH')['BOROUGH'].count().sort_values(ascending=False).head(5)

Unnamed: 0_level_0,BOROUGH
BOROUGH,Unnamed: 1_level_1
BROOKLYN,98083
QUEENS,81779
MANHATTAN,67716
BRONX,45144
STATEN ISLAND,12680


In [38]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

def main():
    """
    Load rows.csv and print the top 5 BOROUGHs with the most crashes.
    """
    spark = SparkSession.builder.appName("TopBoroughsByCrashes").getOrCreate()

    print("--- Loading data from rows.csv ---")
    try:
        df = (
            spark.read
                 .option("header", True)
                 .option("inferSchema", True)
                 .csv("rows.csv")
        )
    except Exception as e:
        print("Error: Could not load 'rows.csv'.")
        print("Please ensure the file is in the same directory as this script.")
        print(f"Details: {e}")
        spark.stop()
        return

    # Ensure the BOROUGH column exists
    if "BOROUGH" not in df.columns:
        print("Error: 'BOROUGH' column not found in rows.csv.")
        spark.stop()
        return

    # Clean borough names (trim + uppercase), filter out null/empty, then aggregate
    result_df = (
        df.withColumn("BOROUGH_CLEAN", F.upper(F.trim(F.col("BOROUGH"))))
          .filter(F.col("BOROUGH_CLEAN").isNotNull() & (F.col("BOROUGH_CLEAN") != ""))
          .groupBy("BOROUGH_CLEAN")
          .count()
          .orderBy(F.desc("count"))
          .limit(5)
    )

    print("\n--- Top 5 BOROUGHs with most crashes ---")
    result_df.show(truncate=False)

    spark.stop()
    print("\n--- Spark session stopped ---")

if __name__ == "__main__":
    main()


--- Loading data from rows.csv ---

--- Top 5 BOROUGHs with most crashes ---
+-------------+------+
|BOROUGH_CLEAN|count |
+-------------+------+
|BROOKLYN     |489424|
|QUEENS       |409665|
|MANHATTAN    |338848|
|BRONX        |226360|
|STATEN ISLAND|64032 |
+-------------+------+


--- Spark session stopped ---


 # Question 3

 > What is the most common cause of accident in 'QUEENS'

In [39]:
nyc_small[(nyc_small.BOROUGH == 'QUEENS')]['CONTRIBUTING FACTOR VEHICLE 1'].value_counts()

# you can also use a group by (to avoid the pandas value_counts function)

nyc_small[(nyc_small.BOROUGH == 'QUEENS')].groupby(
    'CONTRIBUTING FACTOR VEHICLE 1'
)['CONTRIBUTING FACTOR VEHICLE 1'].count().sort_values(ascending=False).head(5)

Unnamed: 0_level_0,CONTRIBUTING FACTOR VEHICLE 1
CONTRIBUTING FACTOR VEHICLE 1,Unnamed: 1_level_1
Unspecified,27921
Driver Inattention/Distraction,17672
Failure to Yield Right-of-Way,7102
Backing Unsafely,3917
Following Too Closely,2995


In [40]:
## Spark?
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("TopCauseQueens").getOrCreate()

# Load CSV
df = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv("rows.csv"))

# Pick the contributing-factor column (handles common variants)
factor_candidates = [
    "CONTRIBUTING FACTOR VEHICLE 1",
    "CONTRIBUTING_FACTOR_VEHICLE_1",
    "CONTRIBUTING FACTOR",
    "PRIMARY CONTRIBUTING FACTOR",
    "PRIMARY_CONTRIBUTING_FACTOR",
]
factor_col = next((c for c in factor_candidates if c in df.columns), None)
if factor_col is None:
    raise ValueError("No contributing-factor column found in rows.csv")

# Compute top cause in QUEENS
top_cause = (
    df.withColumn("BOROUGH_CLEAN", F.upper(F.trim(F.col("BOROUGH"))))
      .withColumn("CAUSE", F.upper(F.trim(F.col(factor_col))))
      .filter((F.col("BOROUGH_CLEAN") == "QUEENS") & F.col("CAUSE").isNotNull() & (F.col("CAUSE") != ""))
      .groupBy("CAUSE")
      .count()
      .orderBy(F.desc("count"))
      .limit(1)
)

print("\nMost common cause in QUEENS:")
top_cause.show(truncate=False)

# (Optional) exclude vague labels like 'UNSPECIFIED' / 'OTHER' and see the top 1 again:
exclude = ["UNSPECIFIED", "OTHER", "OTHER VEHICULAR", "UNKNOWN"]
meaningful_top = (
    df.withColumn("BOROUGH_CLEAN", F.upper(F.trim(F.col("BOROUGH"))))
      .withColumn("CAUSE", F.upper(F.trim(F.col(factor_col))))
      .filter((F.col("BOROUGH_CLEAN") == "QUEENS") & F.col("CAUSE").isNotNull() & (F.col("CAUSE") != ""))
      .filter(~F.col("CAUSE").isin(exclude))
      .groupBy("CAUSE")
      .count()
      .orderBy(F.desc("count"))
      .limit(5)
)

print("\nMost common *meaningful* cause in QUEENS (excluding Unspecified/Other):")
meaningful_top.show(truncate=False)





Most common cause in QUEENS:
+-----------+------+
|CAUSE      |count |
+-----------+------+
|UNSPECIFIED|140132|
+-----------+------+


Most common *meaningful* cause in QUEENS (excluding Unspecified/Other):
+------------------------------+-----+
|CAUSE                         |count|
+------------------------------+-----+
|DRIVER INATTENTION/DISTRACTION|88677|
|FAILURE TO YIELD RIGHT-OF-WAY |35342|
|BACKING UNSAFELY              |19850|
|FOLLOWING TOO CLOSELY         |15102|
|PASSING OR LANE USAGE IMPROPER|11653|
+------------------------------+-----+



# Question 4

> _What is the average number or injuries for specific cars driving in specific suburbs_


In [42]:
nyc_small.groupby(['VEHICLE TYPE CODE 1', 'BOROUGH'])['NUMBER OF PERSONS INJURED'].mean().sort_values(ascending=False).head(3)

Unnamed: 0_level_0,Unnamed: 1_level_0,NUMBER OF PERSONS INJURED
VEHICLE TYPE CODE 1,BOROUGH,Unnamed: 2_level_1
FRONT,BROOKLYN,12.0
MTA B,BROOKLYN,7.0
Amb,BROOKLYN,4.0


In [43]:
## Spark?
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.getOrCreate()

df = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv("rows.csv"))

# Pick columns (handles common variants / spaces)
borough_col = next((c for c in ["BOROUGH", "Borough"] if c in df.columns), None)
veh_col     = next((c for c in ["VEHICLE TYPE CODE 1", "VEHICLE_TYPE_CODE_1",
                                "VEHICLE TYPE", "VEHICLE_TYPE"] if c in df.columns), None)
inj_col     = next((c for c in ["NUMBER OF PERSONS INJURED", "PERSONS INJURED",
                                "NUMBER_OF_PERSONS_INJURED", "PERSONS_INJURED"] if c in df.columns), None)
assert borough_col and veh_col and inj_col, "Required columns not found."

# Compute average injuries per (BOROUGH, VEHICLE) and show top 3
result = (df
    .withColumn("BOROUGH_CLEAN", F.upper(F.trim(F.col(borough_col))))
    .withColumn("VEHICLE_CLEAN", F.upper(F.trim(F.col(veh_col))))
    .withColumn("INJ", F.col(inj_col).cast("double"))
    .filter(F.col("BOROUGH_CLEAN").isNotNull() & (F.col("BOROUGH_CLEAN") != ""))
    .filter(F.col("VEHICLE_CLEAN").isNotNull() & (F.col("VEHICLE_CLEAN") != ""))
    .groupBy("BOROUGH_CLEAN", "VEHICLE_CLEAN")
    .agg(
        F.round(F.avg("INJ"), 3).alias("AVG_INJURIES"),
        F.count("*").alias("N_RECORDS")
    )
    .orderBy(F.desc("AVG_INJURIES"), F.desc("N_RECORDS"))
    .limit(3)
)

print("\nTop 3 (BOROUGH, VEHICLE) by average injuries:")
result.show(truncate=False)



Top 3 (BOROUGH, VEHICLE) by average injuries:
+-------------+-------------+------------+---------+
|BOROUGH_CLEAN|VEHICLE_CLEAN|AVG_INJURIES|N_RECORDS|
+-------------+-------------+------------+---------+
|BROOKLYN     |FRONT        |12.0        |1        |
|QUEENS       |RMB          |11.0        |1        |
|MANHATTAN    |MTA B        |9.0         |1        |
+-------------+-------------+------------+---------+

