<a href="https://colab.research.google.com/github/martinpius/Practicals_final/blob/main/ST_7203_%3EIntro_to_Big_Data_Processing_with_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType, DateType, TimestampType, FloatType
import requests, re, os
from pyspark.sql.types import StructField, StructType

In [None]:
from google.colab import output

In [None]:
spark = SparkSession.builder.appName("st7203_spark_df").getOrCreate()
ui = spark.sparkContext.uiWebUrl
port = int(re.search(pattern = r":(\d+)", string = ui).group(1))
output.serve_kernel_port_as_window(port = port, path = "/jobs/")

In [None]:
rdd_bio = spark.sparkContext.parallelize([
    ["100", "John", "Doe", 28],
    ["101", "Rose", "Kevin", 30],
    ["102", "Ander", "Linus", 22]
])

In [None]:
schema = StructType(
    [StructField(name = "Id", dataType = StringType(), nullable = True),
    StructField(name = "fname", dataType = StringType(), nullable = True),
    StructField("surname", StringType(), True),
    StructField("age", IntegerType(), True)]
)

In [None]:
bio_df = spark.createDataFrame(data = rdd_bio, schema = schema)

In [None]:
bio_df.columns

In [None]:
bio_df.count()

In [None]:
bio_df.show()

In [None]:
pd_df_bio = bio_df.toPandas()

In [None]:
pd_df_bio.head()

In [None]:
# Data engineering
bio_df = bio_df.withColumn("salary", F.col("age") * 600)

In [None]:
bio_df.show()

In [None]:
bio_df.schema

In [None]:
summary_stats = bio_df.agg(
F.sum("age").alias("age_sum"),
F.mean("age").alias("avg_age"),
F.std("age").alias("age_std"),
F.max("age").alias("max_age"),
F.min("age").alias("min_age")
)

In [None]:
summary_stats.show()

In [None]:
num_cols = [f.name for f in bio_df.schema.fields if f.dataType in [IntegerType(), DoubleType(), FloatType()]]

In [None]:
num_cols

In [None]:
total_summary = bio_df.agg(
    *[F.round(F.sum(c), 2).alias(f"{c}_total") for c in num_cols],
    *[F.round(F.mean(c), 2).alias(f"{c}_mean") for c in num_cols],
    *[F.round(F.std(c), 2).alias(f"{c}_std") for c in num_cols]
)

In [None]:
total_summary.show()

In [None]:
bio_df.show()

In [None]:
bio_df = bio_df.withColumn(
    "email", F.concat(F.lower("fname"),F.lit("."), F.lower("surname"), F.lit("@gmail.com"))
)

In [None]:
bio_df.show(truncate = False)

In [None]:
bio_youth = bio_df.filter("age < 30")

In [None]:
bio_youth.show()

In [None]:
from pyspark.sql import SparkSession
from google.colab import drive, output
import os, requests, re
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DateType, IntegerType, DoubleType, FloatType

In [None]:
spark = SparkSession.builder.appName("spark_df_st7203").getOrCreate()
ui = spark.sparkContext.uiWebUrl
port = int(re.search(pattern = r":(\d+)", string = ui).group(1))
output.serve_kernel_port_as_window(port = port, path = "/jobs/")

In [None]:
from pyspark.sql.types import StructField, StructType

In [None]:
schema = StructType(
   [StructField(name = "Id",dataType = StringType(), nullable = True),
    StructField("first_name", StringType(), True),
    StructField("surname", StringType(), True),
    StructField("age", IntegerType(),True)]
)

In [None]:
bio_rdd = spark.sparkContext.parallelize(
    [["100", "Erick", "James", 33],
     ["101", "Norbert", "Ricco", 27],
     ["102", "Amanda", "Luis", 21],
     ["103", "Sam", "Johns", 29],
     ["104", "Sonia", "De Ross", 30],
     ["105", "Kate", "Stacks", 33]])

In [None]:
bio_rdd.take(6)

In [None]:
bio_df = spark.createDataFrame(data = bio_rdd, schema = schema)

In [None]:
bio_df.show()

In [None]:
# Data engineering in Spark
bio_df = bio_df.withColumn(
    "email",
    F.concat(F.lower(F.col("first_name")), F.lit("."), F.regexp_replace(F.lower(F.col("surname")), "\\+", ""), F.lit("@company.com"))
)

In [None]:
bio_df.show(n = 3, truncate = False)

In [None]:
bio_df.columns

In [None]:
bio_df.schema[0]

In [None]:
bio_df = bio_df.withColumn("salary",
                           (F.col("age") * 100))

In [None]:
bio_df.show(n = 6, truncate = False)

In [None]:
bio_df = bio_df.withColumn("tax",
                           F.round(F.col("age") * 0.07, 2))

In [None]:
bio_df.show(n = 6, truncate= False)

In [None]:
intcols = [f.name for f in bio_df.schema.fields if isinstance(f.dataType, IntegerType)]

In [None]:
intcols

In [None]:
summary_stats = bio_df.agg(
    *[F.sum(c).alias(f"{c}_sum") for c in intcols],
    *[F.mean(c).alias(f"{c}_mean") for c in intcols],
    *[F.std(c).alias(f"{c}_std") for c in intcols]
)

In [None]:
summary_stats.show()

In [None]:
!pip install kaggle --quiet

In [None]:
from google.colab import files

In [None]:
files.upload()

In [None]:
#  Setup credentials
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d elemento/nyc-yellow-taxi-trip-data --unzip

In [None]:
!ls -lh

In [None]:
schema = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("rate_code", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
])


In [None]:
df_yellow = spark.read.csv(
                        path = "/content/yellow_tripdata_2015-01.csv",
                        header = True,
                        schema  = schema,
                        mode = "DROPMALFORMED")

In [None]:
df_yellow.count()

In [None]:
df_yellow.show(n = 10, truncate = False)

In [None]:
numcols = [f.name for f in df_yellow.schema.fields if f.dataType in [IntegerType(), FloatType(), DoubleType()]]

In [None]:
numcols

In [None]:
stat_summary = df_yellow.agg(
    *[F.sum(c).alias(f"{c}_total") for c in numcols],
    *[F.mean(c).alias(f"{c}_mean") for c in numcols],
    *[F.std(c).alias(f"{c}_std") for c in numcols]
)

In [None]:
stat_summary.show()

In [None]:
df_yellow.show(n = 4, truncate = False)

In [None]:
def pickdate(c):
    return F.split(c, " ").getItem(0)

In [None]:
df_yellow = df_yellow.withColumn("pickupdate", pickdate("pickup_datetime"))

In [None]:
df_yellow.show(n = 4, truncate = False)

In [None]:
def fetch_time(c):
    ts  = F.split(F.col(c), " ").getItem(1)
    return ts


In [None]:
df_yellow = df_yellow.withColumn("pickuptime", fetch_time("pickup_datetime"))
df_yellow = df_yellow.withColumn("dropofftime", fetch_time("dropoff_datetime"))

In [None]:
df_yellow.show(n = 3, truncate = False)

In [None]:
def time_sum(c):
    ts = F.split(F.col(c), ":").getItem(0).cast("int") * 3600 +\
         F.split(F.col(c), ":").getItem(1).cast("int") * 60 +\
         F.split(F.col(c), ":").getItem(2).cast("int")
    return ts

In [None]:
df_yellow = df_yellow.withColumn("pickup_secs", time_sum("pickuptime"))
df_yellow = df_yellow.withColumn("dropoff_secs", time_sum("dropofftime"))

In [None]:
df_yellow.show(n = 3, truncate = False)

In [None]:
df_yellow_small = df_yellow.filter('pickup_datetime >= "2015-01-01" AND pickup_datetime < "2015-01-15"')

In [None]:
df_yellow_small.count()

In [None]:
df_yellow_small.select("vendor_id").distinct().show(2)

In [None]:
df_yellow_small.groupBy("vendor_id").agg(
    F.min("pickup_secs").alias("start_trip"),
    F.max("dropoff_secs").alias("end_trip")
).withColumn("trip_duration", (F.col("end_trip") - F.col("start_trip"))/3600).show(n = 3, truncate = False)

In [None]:
# SQL

In [None]:
df_yellow_small.createOrReplaceTempView("small_yellow")

In [None]:
top10_trips = spark.sql("""
SELECT vendor_id, pickup_datetime, dropoff_datetime,
COUNT(*) AS total_trips
FROM small_yellow
GROUP BY vendor_id, pickup_datetime, dropoff_datetime
ORDER BY total_trips DESC
LIMIT 10
""")

In [None]:
top10_trips.show(n = 10, truncate = False)

In [None]:
# Sample complete location
samp1 = spark.sql("""
SELECT pickup_latitude,
       pickup_longitude,
       passenger_count,
       fare_amount ,
       COUNT(*) as trip_count FROM small_yellow
       WHERE pickup_latitude IS NOT NULL and pickup_longitude IS NOT NULL
       GROUP BY pickup_longitude,passenger_count, pickup_latitude, fare_amount
       ORDER BY trip_count DESC LIMIT 10
""")

In [None]:
samp1.show(n = 10, truncate = False)

In [None]:
# Total trip per day
total_trip_per_day = spark.sql("""
SELECT
  DATE(pickup_datetime) AS trip_date,
  COUNT(*) AS total_trips
FROM small_yellow
GROUP BY trip_date
ORDER BY trip_date
""")

In [None]:
total_trip_per_day.show(10)

In [None]:
# Average trip distance per day
avg_trip_distance_per_day = spark.sql(
    """
  SELECT
  DATE(pickup_datetime) AS trip_date,
  ROUND(AVG(trip_distance), 2) AS avg_trip_distance
  FROM small_yellow
  GROUP BY trip_date
  ORDER BY trip_date
    """
)

In [None]:
avg_trip_distance_per_day.show(10)

In [None]:
# Vizualization
import matplotlib.pyplot as plt
import seaborn as sns

# Example: hourly trip counts
hourly_trips = spark.sql("""
    SELECT
    HOUR(pickup_datetime) AS hour,
    COUNT(*) AS trip_count
    FROM small_yellow
    GROUP BY hour
    ORDER BY hour
""").toPandas()

plt.figure(figsize=(10,6))
sns.barplot(data=hourly_trips, x='hour', y='trip_count', palette='viridis')
plt.title('Number of Taxi Trips by Hour of Day')
plt.xlabel('Hour of Day')
plt.ylabel('Number of Trips')
plt.show()

In [None]:
import plotly.express as px

pickup_df = spark.sql("""
    SELECT pickup_latitude, pickup_longitude
    FROM small_yellow
    WHERE
      pickup_latitude BETWEEN 40.5 AND 41.0 AND
      pickup_longitude BETWEEN -74.3 AND -73.7
      AND pickup_latitude IS NOT NULL
      AND pickup_longitude IS NOT NULL
    LIMIT 10000
""").toPandas()

In [None]:
fig = px.scatter_mapbox(
    pickup_df.sample(5000),
    lat="pickup_latitude",
    lon="pickup_longitude",
    zoom=10,
    center={"lat": 40.7128, "lon": -74.0060},
    height=600,
    mapbox_style="open-street-map",
    title="NYC Taxi Pickups - Scatter"
)

fig.show()

In [None]:
# More interactive....
fig = px.density_mapbox(
    pickup_df.sample(15000, replace= True),
    lat='pickup_latitude',
    lon='pickup_longitude',
    radius=5,
    zoom=11,
    center=dict(lat=40.7128, lon=-74.0060),
    mapbox_style='open-street-map',
    title='NYC Taxi Pickup Density'
)
fig.show()