In [None]:
sc

In [None]:
%pip install geopy

Python interpreter will be restarted.
Collecting geopy
  Downloading geopy-2.4.1-py3-none-any.whl (125 kB)
Collecting geographiclib<3,>=1.52
  Downloading geographiclib-2.0-py3-none-any.whl (40 kB)
Installing collected packages: geographiclib, geopy
Successfully installed geographiclib-2.0 geopy-2.4.1
Python interpreter will be restarted.


## Transformation Phase
1. Finding and Handling Missing Values
2. Convert TimeStamp from String to datetime
3. Create Merge_date field for merging the data based on source and timeStamp
5. Create new column with dollar per mile for each cab
4. Drop Duplicates in weather data
5. Join Cabs and Weather Data based on merge_date




In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
cabs_df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", "xxxxxxxxxxxxxxxxxxx") \
    .option("spark.cosmos.accountKey", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") \
    .option("spark.cosmos.database", "cabsDB") \
    .option("spark.cosmos.container", "cabs_data_backup") \
    .load()

In [None]:
cabs_df.repartition(col('source'))

Out[3]: DataFrame[name: string, cab_type: string, time_stamp: string, source: string, price: string, id: string, product_id: string, surge_multiplier: string, destination: string, distance: string]

In [None]:
    cabs_df = cabs_df.cache()

In [None]:
cabs_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- time_stamp: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price: string (nullable = true)
 |-- id: string (nullable = false)
 |-- product_id: string (nullable = true)
 |-- surge_multiplier: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- distance: string (nullable = true)



In [None]:
cabs_schema = StructType([
    StructField("distance", DoubleType(), nullable=True),
    StructField("cab_type", StringType(), nullable=True),
    StructField("time_stamp", DoubleType(), nullable=True),
    StructField("destination", StringType(), nullable=True),
    StructField("source", StringType(), nullable=True),
    StructField("price", DoubleType(), nullable=True),
    StructField("surge_multiplier", DoubleType(), nullable=True),
    StructField("id", StringType(), nullable=True),
    StructField("product_id", StringType(), nullable=True),
    StructField("name", StringType(), nullable=True)
])

cabs_df = cabs_df.select(
    [col(field.name).cast(field.dataType) for field in cabs_schema.fields]
)

In [None]:
cabs_df = cabs_df.cache()

In [None]:
cabs_df.printSchema()

root
 |-- distance: double (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- time_stamp: double (nullable = true)
 |-- destination: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price: double (nullable = true)
 |-- surge_multiplier: double (nullable = true)
 |-- id: string (nullable = false)
 |-- product_id: string (nullable = true)
 |-- name: string (nullable = true)



In [None]:
weather_df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", "https://azcosmodbtask.documents.azure.com:443/") \
    .option("spark.cosmos.accountKey", "sdvloiD1P9uL4g4YDrCYV74cXZ5hfKSNPfEvcIBWnM3N90w8ZNATyc44mi6nGk86gzUiKodsjrm8ACDbGiojJQ==") \
    .option("spark.cosmos.database", "cabsDB") \
    .option("spark.cosmos.container", "weather") \
    .load()

In [None]:
new_schema = StructType([
    StructField("temp", DoubleType(), nullable=True),
    StructField("location", StringType(), nullable=True),
    StructField("clouds", DoubleType(), nullable=True),
    StructField("pressure", DoubleType(), nullable=True),
    StructField("rain", DoubleType(), nullable=True),
    StructField("time_stamp", IntegerType(), nullable=True),
    StructField("humidity", DoubleType(), nullable=True),
    StructField("wind", DoubleType(), nullable=True)
])

weather_df = weather_df.select(
    [col(field.name).cast(field.dataType) for field in new_schema.fields]
)

In [None]:
weather_df = weather_df.cache()
weather_df.printSchema()

root
 |-- temp: double (nullable = true)
 |-- location: string (nullable = true)
 |-- clouds: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- rain: double (nullable = true)
 |-- time_stamp: integer (nullable = true)
 |-- humidity: double (nullable = true)
 |-- wind: double (nullable = true)



# Shape of data

In [None]:
cabs_df.count()

Out[15]: 693071

In [None]:
len(cabs_df.columns)

Out[16]: 10

In [None]:
weather_df.count()

Out[17]: 6276

In [None]:
len(weather_df.columns)

Out[18]: 8

In [None]:
cabs_df.printSchema()

root
 |-- distance: double (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- time_stamp: double (nullable = true)
 |-- destination: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price: double (nullable = true)
 |-- surge_multiplier: double (nullable = true)
 |-- id: string (nullable = false)
 |-- product_id: string (nullable = true)
 |-- name: string (nullable = true)



In [None]:
weather_df.printSchema()

root
 |-- temp: double (nullable = true)
 |-- location: string (nullable = true)
 |-- clouds: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- rain: double (nullable = true)
 |-- time_stamp: integer (nullable = true)
 |-- humidity: double (nullable = true)
 |-- wind: double (nullable = true)



In [None]:
from pyspark.sql.functions import *

### Finding and Handling the Null values 

##### Cabs Data

In [None]:
cabs_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c) for c in cabs_df.columns]).show()

+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+
|distance|cab_type|time_stamp|destination|source|price|surge_multiplier| id|product_id|name|
+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+
|       0|       0|         0|          0|     0|55095|               0|  0|         0|   0|
+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+



In [None]:
cabs_df_cleaned = cabs_df.dropna()

In [None]:
cabs_df_cleaned.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c) for c in cabs_df_cleaned.columns]).show()

+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+
|distance|cab_type|time_stamp|destination|source|price|surge_multiplier| id|product_id|name|
+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+
|       0|       0|         0|          0|     0|    0|               0|  0|         0|   0|
+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+



##### Weather Data

In [None]:
weather_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c) for c in weather_df.columns]).show()

+----+--------+------+--------+----+----------+--------+----+
|temp|location|clouds|pressure|rain|time_stamp|humidity|wind|
+----+--------+------+--------+----+----------+--------+----+
|   0|       0|     0|       0|5382|         0|       0|   0|
+----+--------+------+--------+----+----------+--------+----+



In [None]:
weather_df_cleaned = weather_df.fillna({'rain':0.0})

In [None]:
weather_df_cleaned.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c) for c in weather_df_cleaned.columns]).show()

+----+--------+------+--------+----+----------+--------+----+
|temp|location|clouds|pressure|rain|time_stamp|humidity|wind|
+----+--------+------+--------+----+----------+--------+----+
|   0|       0|     0|       0|   0|         0|       0|   0|
+----+--------+------+--------+----+----------+--------+----+



### Convert TimeStamp from String to datetime

In [None]:
cabs_df_tf = cabs_df_cleaned.withColumn("time_stamp", to_timestamp(col("time_stamp") / 1000)) \
    .withColumn("date", date_format(col("time_stamp"), 'yyyy-MM-dd')) \
    .withColumn("time", hour(col("time_stamp"))) \
    .withColumn("weekday", dayofweek(col("time_stamp")) - 1)  

weather_df_tf = weather_df_cleaned.withColumn("time_stamp", to_timestamp(col("time_stamp"))) \
    .withColumn("date", date_format(col("time_stamp"), 'yyyy-MM-dd')) \
    .withColumn("time", hour(col("time_stamp")))


### Finding out the Dollar per mile 

In [None]:
cabs_df_tf = cabs_df_tf.withColumn("dollars_per_mile", col("price") / col("distance"))

#### Find Total revenue

In [None]:
cabs_df_tf = cabs_df_tf.withColumn('revenue', col('price') * col('surge_multiplier'))

In [None]:
cabs_df_tf.select(['time_stamp']).show(2)

+-------------------+
|         time_stamp|
+-------------------+
|2018-12-14 12:20:00|
|2018-12-14 12:20:00|
+-------------------+
only showing top 2 rows



In [None]:
weather_df_tf.select(['time_stamp']).show(2)

+-------------------+
|         time_stamp|
+-------------------+
|2018-12-14 23:45:01|
|2018-12-13 12:45:01|
+-------------------+
only showing top 2 rows



### Create Merge_date field for merging the data based on source and timeStamp

In [None]:
cabs_df_tf = cabs_df_tf.withColumn("merge_date", 
                           concat_ws(" - ", 
                                     col("source").cast("string"), 
                                     date_format(col("time_stamp"), 'yyyy-MM-dd'), 
                                     hour(col("time_stamp")).cast("string")))

weather_df_tf = weather_df_tf.withColumn("merge_date", 
                             concat_ws(" - ", 
                                       col("location").cast("string"), 
                                       date_format(col("time_stamp"), 'yyyy-MM-dd'), 
                                       hour(col("time_stamp")).cast("string")))



In [None]:
weather_df_tf.select(['merge_date']).show(5, truncate=False)

+-------------------------------+
|merge_date                     |
+-------------------------------+
|North Station - 2018-12-14 - 23|
|North Station - 2018-12-13 - 12|
|North Station - 2018-11-27 - 22|
|North Station - 2018-11-28 - 22|
|North Station - 2018-11-30 - 8 |
+-------------------------------+
only showing top 5 rows



#### Drop Duplicates in wheather data

In [None]:
weather_df_tf = weather_df_tf.dropDuplicates(['merge_date'])

In [None]:
weather_df_tf.count()

Out[36]: 3960

##### Rename columns with same name

In [None]:
weather_df_tf = weather_df_tf.withColumnRenamed('time_stamp','time_stamp_w')
weather_df_tf = weather_df_tf.withColumnRenamed('date','date_w')
weather_df_tf = weather_df_tf.withColumnRenamed('time','time_w')
weather_df_tf = weather_df_tf.withColumnRenamed('merge_date','merge_date_w')
weather_df_tf = weather_df_tf.withColumnRenamed('id','id_w')

### Join Cabs and Weather Data

In [None]:
# A broadcast join occurs when the smaller dataset (or DataFrame) is sent to all worker nodes in the Spark cluster. This allows each node to perform the join 
#  operation locally without needing to shuffle large amounts of data across the network, which can be time-consuming and resource-intensive.

from pyspark.sql.functions import broadcast

weather_df_tf = broadcast(weather_df_tf)

In [None]:
joined_df = cabs_df_tf.join(weather_df_tf, cabs_df_tf['merge_date'] == weather_df_tf['merge_date_w'], how='left')

### Find and handle missing values

In [None]:
joined_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c) for c in joined_df.columns]).show()

+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+----+----+-------+----------------+-------+----------+----+--------+------+--------+----+------------+--------+----+------+------+------------+
|distance|cab_type|time_stamp|destination|source|price|surge_multiplier| id|product_id|name|date|time|weekday|dollars_per_mile|revenue|merge_date|temp|location|clouds|pressure|rain|time_stamp_w|humidity|wind|date_w|time_w|merge_date_w|
+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+----+----+-------+----------------+-------+----------+----+--------+------+--------+----+------------+--------+----+------+------+------------+
|       0|       0|         0|          0|     0|    0|               0|  0|         0|   0|   0|   0|      0|               0|      0|         0|5573|    5573|  5573|    5573|5573|        5573|    5573|5573|  5573|  5573|        5573|
+--------+--------+----------+-----------+------+-----+-

In [None]:
joined_df = joined_df.dropna()

In [None]:
joined_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c) for c in joined_df.columns]).show()

+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+----+----+-------+----------------+-------+----------+----+--------+------+--------+----+------------+--------+----+------+------+------------+
|distance|cab_type|time_stamp|destination|source|price|surge_multiplier| id|product_id|name|date|time|weekday|dollars_per_mile|revenue|merge_date|temp|location|clouds|pressure|rain|time_stamp_w|humidity|wind|date_w|time_w|merge_date_w|
+--------+--------+----------+-----------+------+-----+----------------+---+----------+----+----+----+-------+----------------+-------+----------+----+--------+------+--------+----+------------+--------+----+------+------+------------+
|       0|       0|         0|          0|     0|    0|               0|  0|         0|   0|   0|   0|      0|               0|      0|         0|   0|       0|     0|       0|   0|           0|       0|   0|     0|     0|           0|
+--------+--------+----------+-----------+------+-----+-

In [None]:
joined_df.count()

Out[43]: 632403

In [None]:
joined_df.groupBy(col('cab_type')).count().show()

+--------+------+
|cab_type| count|
+--------+------+
|    Lyft|304637|
|    Uber|327766|
+--------+------+



# Find Lat and Lon using source names

In [None]:
from geopy.geocoders import Nominatim
import numpy as np
import pandas as pd
import time


distinct_sources = joined_df.select('source').distinct().rdd.flatMap(lambda x: x).collect()


geolocator = Nominatim(user_agent="app")

def geocode_with_retry(location, retries=3, delay=5):
    for _ in range(retries):
        try:
            result = geolocator.geocode(location)
            if result:
                return (result.latitude, result.longitude)
        except (TimeoutError, Exception) as e:
            print(f"Error: {e}. Retrying...")
            time.sleep(delay)
    return np.nan


lat_long = []

for location in distinct_sources:
    geo = geocode_with_retry(location)
    lat_long.append(geo)


df_lat_long = pd.DataFrame({
    'location': distinct_sources,
    'lat_long': lat_long
})

df_lat_long[['latitude', 'longitude']] = pd.DataFrame(df_lat_long['lat_long'].tolist(), index=df_lat_long.index)

df_lat_long.drop(columns=['lat_long'], inplace=True)





In [None]:
df_lat_long = spark.createDataFrame(df_lat_long, ["location_r", "source_latitude", "source_longitude"])

In [None]:
joined_df = joined_df.join(df_lat_long, joined_df.source == df_lat_long.location_r, how='left')

In [None]:
joined_df = joined_df.drop("location_r")

In [None]:
joined_df.select(['source','source_latitude','source_longitude']).distinct().show()

+--------------------+------------------+--------------------+
|              source|   source_latitude|    source_longitude|
+--------------------+------------------+--------------------+
|       North Station|        39.4661332|-0.37777229059911954|
|Northeastern Univ...|       42.33895455|  -71.08805803336392|
|         Beacon Hill|        42.3587085|          -71.067829|
|       South Station|        42.3525085|         -71.0549447|
|  Financial District|        40.7076681|          -74.009271|
|           North End|        42.3650974|         -71.0544954|
|    Haymarket Square|        42.3629502|         -71.0578447|
|   Boston University|42.350421499999996|  -71.10322831831216|
|            West End|        42.3639186|         -71.0638993|
|    Theatre District|        39.7446498|         -104.996402|
|            Back Bay|       42.35054885|  -71.08031131584724|
|              Fenway|        42.3451868|         -71.1045987|
+--------------------+------------------+--------------

## Shape of data

In [None]:
joined_df.count()

Out[60]: 632403

In [None]:
len(joined_df.columns)

Out[61]: 29

In [None]:
joined_df.columns

Out[54]: ['distance',
 'cab_type',
 'time_stamp',
 'destination',
 'source',
 'price',
 'surge_multiplier',
 'id',
 'product_id',
 'name',
 'date',
 'time',
 'weekday',
 'dollars_per_mile',
 'revenue',
 'merge_date',
 'temp',
 'location',
 'clouds',
 'pressure',
 'rain',
 'time_stamp_w',
 'humidity',
 'wind',
 'date_w',
 'time_w',
 'merge_date_w']

In [None]:
storage_account_name = "xxxxxxxxxxxxxxxxx"
storage_account_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
container_name = "cabsdatatransformed"
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"
joined_df.write.mode("overwrite").option("header", "true").parquet(output_path)