In [None]:
  import urllib.request
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, month, year, countDistinct, sum as _sum, trim, concat_ws
from pyspark.sql.types import StructType, StructField, IntegerType, StringType


spark = SparkSession.builder\
        .master("local")\
        .appName("Bike Data Analysis")\
        .config('spark.ui.port', '4050')\
        .config('spark.driver.extraClassPath', 'jars/*')\
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")\
        .config("spark.sql.analyzer.failAmbiguousSelfJoin", "false")\
        .config("spark.mongodb.output.uri", "mongodb+srv://saeedaramzan1:Password1@karachiai.g4muy.mongodb.net/Assignment.Bikedata?retryWrites=true&w=majority")\
        .config("spark.mongodb.connection.timeout", "60000")\
        .config("spark.mongodb.socket.timeout", "60000")\
        .getOrCreate()


data_files = {
    "trips": "https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/bike-data/201508_trip_data.csv",
    "stations": "https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/bike-data/201508_station_data.csv",
    "geo": "https://raw.githubusercontent.com/scpike/us-state-county-zip/master/geo-data.csv"
}


for key, url in data_files.items():
    try:
        print(f"Downloading {key} data...")
        urllib.request.urlretrieve(url, f"/tmp/{key}_data.csv")
        print(f"{key.capitalize()} data downloaded successfully.")
    except Exception as e:
        print(f"Failed to download {key} data: {e}")


trips_schema = StructType([
    StructField("Trip ID", StringType(), True),
    StructField("Duration", IntegerType(), True),
    StructField("Start Date", StringType(), True),
    StructField("Start Station", StringType(), True),
    StructField("Start Terminal", IntegerType(), True),
    StructField("End Date", StringType(), True),
    StructField("End Station", StringType(), True),
    StructField("End Terminal", IntegerType(), True),
    StructField("Bike #", StringType(), True),
    StructField("Subscriber Type", StringType(), True),
    StructField("Zip Code", StringType(), True)
])


stations_schema = StructType([
    StructField("station_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True),
    StructField("dockcount", IntegerType(), True),
    StructField("landmark", StringType(), True),
    StructField("installation", StringType(), True)
])


zip_code_schema = StructType([
    StructField("zipcode", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("county", StringType(), True)
])


try:
    print("Loading the trips data...")
    trips_df = spark.read.option("header", "true").schema(trips_schema).csv("/tmp/trips_data.csv")
    trips_df.show(5)

    print("Loading the stations data...")
    stations_df = spark.read.option("header", "true").schema(stations_schema).csv("/tmp/stations_data.csv")
    stations_df.show(5)

    print("Loading the zip code map data...")
    zip_code_map_df = spark.read.option("header", "true").schema(zip_code_schema).csv("/tmp/geo-data.csv")
    zip_code_map_df.show(5)

except Exception as e:
    print(f"Error loading data: {e}")


trips_df = trips_df.withColumn("Start Date", to_timestamp(col("Start Date"), "MM/dd/yyyy HH:mm"))
trips_df = trips_df.withColumn("End Date", to_timestamp(col("End Date"), "MM/dd/yyyy HH:mm"))
trips_df.show(5)


trips_filtered = trips_df.filter(col("Start Terminal").isNotNull() & col("End Terminal").isNotNull())


stations_start_df = stations_df.withColumnRenamed("landmark", "Starting Landmark")

trips_with_start_landmark = trips_filtered.join(
    stations_start_df,
    trips_filtered["Start Terminal"] == stations_start_df["station_id"],
    "inner"
)


stations_end_df = stations_df.withColumnRenamed("landmark", "Ending Landmark")

trips_with_landmarks = trips_with_start_landmark.join(
    stations_end_df,
    trips_with_start_landmark["End Terminal"] == stations_end_df["station_id"],
    "inner"
).select(
    "Trip ID", "Start Date", "Start Station", "End Station", "Duration", "Subscriber Type", "Zip Code",
    "Starting Landmark", "Ending Landmark"
)

trips_with_landmarks.show(5)


trips_with_landmarks = trips_with_landmarks.withColumn("Zip Code", trim(col("Zip Code")))


print("Joining Trips DataFrame with Zip Code Map DataFrame to add states...")
trips_with_zip = trips_with_landmarks.join(
    zip_code_map_df,
    trips_with_landmarks["Zip Code"] == zip_code_map_df["zipcode"],
    "left"
).select(
    "Trip ID", "Start Date", "Start Station", "End Station", "Duration", "Subscriber Type",
    "Starting Landmark", "Ending Landmark", zip_code_map_df["state"].alias("state")
)


trips_with_zip = trips_with_zip.fillna({"state": "Unknown"})
trips_with_zip.show(5)


print("Extracting Month_Year from Start Date...")
trips_with_zip = trips_with_zip.withColumn("Month_Year",
    concat_ws("/", month(col("Start Date")).cast("string"), year(col("Start Date")).cast("string"))
)

trips_with_zip.show(5)


print("Selecting relevant columns...")
final_trips_df = trips_with_zip.select(
    "Trip ID", "Starting Landmark", "Ending Landmark", "state", "Subscriber Type", "Month_Year", "Duration"
)

final_trips_df.show(5)


print("Aggregating data...")
aggregated_df = final_trips_df.groupBy(
    "Starting Landmark", "Ending Landmark", "state", "Subscriber Type", "Month_Year"
).agg(
    countDistinct("Trip ID").alias("Number_of_Trips"),
    (_sum("Duration") / 60).alias("Duration_Total_Minutes")
)

aggregated_df.show(5)


print("Saving aggregated data to MongoDB...")
mongo_uri = "mongodb+srv://saeedaramzan1:Password1@karachiai.g4muy.mongodb.net/Assignment.Bikedata?retryWrites=true&w=majority"

try:
    aggregated_df.write.format("mongo")\
        .mode("append")\
        .option("uri", mongo_uri)\
        .save()
    print("Data saved successfully to MongoDB.")
except Exception as e:
    print(f"Failed to save data to MongoDB: {e}")


aggregated_df.show(5)


spark.stop()


Downloading trips data...
Trips data downloaded successfully.
Downloading stations data...
Stations data downloaded successfully.
Downloading geo data...
Geo data downloaded successfully.
Loading the trips data...
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|8/3