In [124]:
!pwd

/Users/muratkahraman/Downloads/data-engineering-practices/Exercises/Exercise-6


In [144]:
import pandas as pd
import os
import zipfile
from pyspark.sql import SparkSession

def load_data(spark, folder_path, selected_columns):
    zip_files = [os.path.join(folder_path, file_name) for file_name in os.listdir(folder_path) if file_name.endswith(".zip")]
    dfs = []

    for zip_file in zip_files:
        with zipfile.ZipFile(zip_file, 'r') as zip_ref:
            for file_info in zip_ref.infolist():
                if file_info.filename.endswith('.csv') and not file_info.filename.startswith('__MACOSX/'):
                    with zip_ref.open(file_info) as csv_file:
                        try:
                            # Read the CSV file into a pandas DataFrame using 'ISO-8859-1' encoding
                            df = pd.read_csv(csv_file, encoding='ISO-8859-1')

                            # Check if the selected columns exist in the DataFrame
                            if all(col in df.columns for col in selected_columns):
                                # Select only the desired columns
                                df = df[selected_columns]

                                # Convert the pandas DataFrame to a PySpark DataFrame
                                df = spark.createDataFrame(df)
                                dfs.append(df)
                            else:
                                print(f"Skipping file {file_info.filename} as it does not contain the selected columns.")
                        except UnicodeDecodeError:
                            print(f"Error reading file {file_info.filename}. Skipping this file.")

    if dfs:
        # Combine all DataFrames into a single DataFrame
        combined_df = dfs[0] if len(dfs) == 1 else dfs[0].unionAll(dfs[1:])
        return combined_df
    else:
        return None

# Initialize Spark
spark = SparkSession.builder.appName("AverageTripDuration").getOrCreate()

# Define the selected columns
selected_columns = ["trip_id", "start_time", "end_time", "bikeid", "tripduration", "from_station_id", "from_station_name", "to_station_id", "to_station_name", "usertype", "gender", "birthyear"]

# Specify the folder path containing the zip files
folder_path = "data"  # Replace with your folder path

# Load and process the data
result = load_data(spark, folder_path, selected_columns)


Skipping file Divvy_Trips_2020_Q1.csv as it does not contain the selected columns.


TypeError: field gender: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

In [None]:
from pyspark.sql.functions import to_date, avg

def calculate_avg_trip_duration_per_day(data):
    # Convert 'start_time' to a date type
    data = data.withColumn('start_date', to_date(data['start_time']))

    # Group by date and calculate the average trip duration
    avg_duration_per_day = data.groupBy('start_date').agg(avg('tripduration').alias('avg_trip_duration'))

    return avg_duration_per_day


In [None]:
def count_trips_per_day(data):
    trips_per_day = data.groupBy('start_date').count()
    return trips_per_day


In [None]:
def compare_trip_duration_by_gender(data):
    # Group by 'gender' and calculate the average trip duration
    avg_duration_by_gender = data.groupBy('gender').agg(avg('tripduration').alias('avg_trip_duration'))

    return avg_duration_by_gender


In [None]:
selected_columns=["trip_id","start_time","end_time","bikeid","tripduration","from_station_id","from_station_name","to_station_id","to_station_name","usertype","gender","birthyear"]
load_data(spark, "data",selected_columns)

In [145]:
zip_file_1

['ride_id',
 'rideable_type',
 'started_at',
 'ended_at',
 'start_station_name',
 'start_station_id',
 'end_station_name',
 'end_station_id',
 'start_lat',
 'start_lng',
 'end_lat',
 'end_lng',
 'member_casual']

In [146]:
zip_file_2

['trip_id',
 'start_time',
 'end_time',
 'bikeid',
 'tripduration',
 'from_station_id',
 'from_station_name',
 'to_station_id',
 'to_station_name',
 'usertype',
 'gender',
 'birthyear']

In [2]:
from pyspark.sql import SparkSession
import os
import zipfile
import tempfile

def count_records_in_csv_files(folder_path, spark):
    zip_files = [os.path.join(folder_path, file_name) for file_name in os.listdir(folder_path) if file_name.endswith(".zip")]

    for zip_file in zip_files:
        with zipfile.ZipFile(zip_file, 'r') as zip_ref:
            for file_info in zip_ref.infolist():
                if file_info.filename.endswith('.csv'):
                    with zip_ref.open(file_info) as csv_file:
                        # Create a temporary file to store the CSV contents
                        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                            temp_file.write(csv_file.read())
                            temp_file_name = temp_file.name

                        # Read the CSV file from the temporary file and create a DataFrame
                        try:
                            df = spark.read.option("header", "true").csv(temp_file_name, inferSchema=True)
                            # Print the file name and the count of records in the DataFrame
                            print(f"File: {file_info.filename}, Record Count: {df.count()}")
                        except:
                            # Handle any errors that occur during reading
                            print(f"Skipping file {file_info.filename} due to an error")

                        # Remove the temporary file
                        os.remove(temp_file_name)


# Create a SparkSession
spark = SparkSession.builder.appName("CountCSVRecords").getOrCreate()

# Define the path to the folder containing the zip files
folder_path = "data"  # Replace with the path to your data folder

# Call the function to count records in CSV files
count_records_in_csv_files(folder_path, spark)

# Stop the SparkSession
spark.stop()


                                                                                

File: Divvy_Trips_2020_Q1.csv, Record Count: 426887
File: __MACOSX/._Divvy_Trips_2020_Q1.csv, Record Count: 0


                                                                                

File: Divvy_Trips_2019_Q4.csv, Record Count: 704054
File: __MACOSX/._Divvy_Trips_2019_Q4.csv, Record Count: 0


In [51]:
def count_records_in_csv_files(folder_path, spark):
    zip_files = [os.path.join(folder_path, file_name) for file_name in os.listdir(folder_path) if file_name.endswith(".zip")]

    for zip_file in zip_files:
        with zipfile.ZipFile(zip_file, 'r') as zip_ref:
            for file_info in zip_ref.infolist():
                if file_info.filename.endswith('.csv'):
                    with zip_ref.open(file_info) as csv_file:
                        # Create a temporary file to store the CSV contents
                        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                            temp_file.write(csv_file.read())
                            temp_file_name = temp_file.name

                        # Read the CSV file from the temporary file and create a DataFrame
                        try:
                            df = spark.read.option("header", "true").csv(temp_file_name, inferSchema=True)
                            record_count = df.count()
                            if record_count > 0:
                                # Print the file name and the count of records in the DataFrame
                                print(f"File: {file_info.filename}, Record Count: {record_count}")
                        except:
                            # Handle any errors that occur during reading
                            print(f"Skipping file {file_info.filename} due to an error")

                        # Remove the temporary file
                        os.remove(temp_file_name)


In [52]:
# Create a SparkSession
spark = SparkSession.builder.appName("CountCSVRecords").getOrCreate()

# Define the path to the folder containing the zip files
folder_path = "data"  # Replace with the path to your data folder

# Call the function to count records in CSV files
count_records_in_csv_files(folder_path, spark)

# Stop the SparkSession
spark.stop()

23/10/26 22:42:29 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


                                                                                

File: Divvy_Trips_2020_Q1.csv, Record Count: 426887


                                                                                

File: Divvy_Trips_2019_Q4.csv, Record Count: 704054


In [53]:
from pyspark.sql.functions import date_format

def trips_per_day(df):
    # Extract the date from the 'start_time' column
    df = df.withColumn("start_date", date_format("start_time", "yyyy-MM-dd"))
    # Group by date and count the number of trips
    result = df.groupBy("start_date").count()
    return result


In [54]:
def popular_starting_stations_by_month(df):
    # Extract the month from the 'start_time' column
    df = df.withColumn("start_month", date_format("start_time", "yyyy-MM"))
    # Group by month and starting station, and count the trips
    result = df.groupBy("start_month", "from_station_name").count()
    # Find the station with the highest count in each month
    result = result.orderBy("start_month", col("count").desc()).groupBy("start_month").agg(
        first("from_station_name").alias("most_popular_starting_station")
    )
    return result


In [138]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank,dense_rank

def top_trip_stations_last_two_weeks(df):
    # Calculate the date difference from the current date
    df = df.withColumn("days_ago", datediff(current_date(), "start_time"))
    # Filter the data for the last two weeks
    df = df.filter((col("days_ago") >= 0) & (col("days_ago") <= 14))
    # Create a window specification for ranking
    window_spec = Window.partitionBy("start_date").orderBy(col("count").desc())
    # Rank the stations by the number of trips for each day
    result = df.withColumn("station_rank", rank().over(window_spec))
    # Filter the top 3 stations for each day
    result = result.filter(col("station_rank") <= 3).drop("station_rank")
    return result


In [57]:
from pyspark.sql.functions import avg

def average_trip_duration_by_gender(df):
    result = df.groupBy("gender").agg(avg("tripduration").alias("avg_trip_duration"))
    return result


In [58]:
def top_10_ages_longest_and_shortest_trips(df):
    result = df.groupBy("birthyear").agg(avg("tripduration").alias("avg_trip_duration"))
    # Sort by average trip duration and select the top 10 and bottom 10 ages
    result = result.orderBy("avg_trip_duration", ascending=False).limit(10)
    return result


In [59]:
def perform_data_analysis(df):
    # Call each function and save the results to CSV files
    trips_per_day_result = trips_per_day(df)
    trips_per_day_result.write.csv("reports/trips_per_day.csv", header=True, mode="overwrite")

    popular_stations_result = popular_starting_stations_by_month(df)
    popular_stations_result.write.csv("reports/popular_starting_stations_by_month.csv", header=True, mode="overwrite")

    top_stations_result = top_trip_stations_last_two_weeks(df)
    top_stations_result.write.csv("reports/top_trip_stations_last_two_weeks.csv", header=True, mode="overwrite")

    gender_duration_result = average_trip_duration_by_gender(df)
    gender_duration_result.write.csv("reports/average_trip_duration_by_gender.csv", header=True, mode="overwrite")

    age_duration_result = top_10


In [None]:
spark.stop()

In [244]:
import os
import zipfile
import tempfile
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql import DataFrame
from pyspark.sql.functions import date_format, datediff, col, rank, avg, first,desc,row_number,lit
from pyspark.sql.window import Window


# Initialize a SparkSession
spark = SparkSession.builder.appName("DivvyTripsAnalysis").getOrCreate()

# Define the function to load data from zipped CSV files
def load_data(folder_path):
    zip_files = [os.path.join(folder_path, file_name) for file_name in os.listdir(folder_path) if file_name.endswith(".zip")]
    data_frames = []

    for zip_file in zip_files:
        with zipfile.ZipFile(zip_file, 'r') as zip_ref:
            for file_info in zip_ref.infolist():
                if file_info.filename.endswith('.csv'):
                    with zip_ref.open(file_info) as csv_file:
                        try:
                            # Create a temporary file to store the CSV contents
                            with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                                temp_file.write(csv_file.read())
                                temp_file_name = temp_file.name

                            # Read the CSV file from the temporary file and create a DataFrame
                            df = spark.read.option("header", "true").csv(temp_file_name, inferSchema=True)
                            if all(col_name in df.columns for col_name in selected_columns):
                                data_frames.append(df)
                            else:
                                print(f"Skipping file {file_info.filename} as it does not contain the selected columns.")
                        except Exception as e:
                            # Handle any errors that occur during reading
                            print(f"Skipping file {file_info.filename} due to an error: {str(e)}")

                            # Remove the temporary file in case of an error
                            os.remove(temp_file_name)
    
    return data_frames

# Define the function to combine DataFrames into one DataFrame
def combine_dataframes(data_frames):
    if data_frames:
        combined_df = data_frames[0]
        for i in range(1, len(data_frames)):
            combined_df = combined_df.union(data_frames[i])
        return combined_df
    else:
        return None

# Define the selected columns to filter by
selected_columns = ["trip_id", "start_time", "end_time", "bikeid", "tripduration", "from_station_id", "from_station_name",
                    "to_station_id", "to_station_name", "usertype", "gender", "birthyear"]

# Define the path to the folder containing the zip files
folder_path = "data"  # Replace with the path to your data folder

# Load data and combine DataFrames
data_frames = load_data(folder_path)
combined_df = combine_dataframes(data_frames)

# Check if combined_df is not None
if combined_df:
    
    
    # 1. What is the average trip duration per day?
    def average_trip_duration_per_day(df: DataFrame):
        result = df.withColumn("start_date", date_format("start_time", "yyyy-MM-dd"))
        result = result.dropDuplicates()
        result = result.groupBy("start_date").agg(avg("tripduration").alias("avg_trip_duration"))
        result = result.orderBy("start_date")  # Sort by start_date in ascending order
        result.write.csv("reports/average_trip_duration_per_day.csv", header=True, mode="overwrite")
    
    # 2. How many trips were taken each day?
    def trips_per_day(df: DataFrame):
        result = df.withColumn("start_date", date_format("start_time", "yyyy-MM-dd"))
        result = result.dropDuplicates()
        result = result.groupBy("start_date").count()
        result = result.orderBy("start_date") 
        result.write.csv("reports/trips_per_day.csv", header=True, mode="overwrite")
    
    # 3. What was the most popular starting trip station for each month?
    def popular_starting_stations_by_month(df: DataFrame):
        result = df.withColumn("start_month", date_format("start_time", "yyyy-MM"))
        result = result.dropDuplicates()
        result = result.groupBy("start_month", "from_station_id", "from_station_name").count()
        result = result.orderBy("count").sort(desc("count"))
        window = Window.partitionBy("start_month").orderBy(desc("count"))
        result=result.withColumn("station_rank", row_number().over(window))
        result = result.filter(col("station_rank") == 1).drop("station_rank")
        result.write.csv("reports/popular_starting_stations_by_month.csv", header=True, mode="overwrite")

    # 4. What were the top 3 trip stations each day for the last two weeks?
    def top_trip_stations_last_two_weeks(df: DataFrame):
        result = df.withColumn("start_date", date_format("start_time", "yyyy-MM-dd"))
        result = result.dropDuplicates()
        last_date = result.agg({"start_date": "max"}).collect()[0][0]
        result = result.withColumn("days_ago", datediff(lit(last_date), "start_date"))
        result = result.filter((col("days_ago") >= 0) & (col("days_ago") <= 14))
        result = result.groupBy("start_date", "from_station_id", "from_station_name").count()
        result = result.orderBy("count").sort(desc("count"))
        window = Window.partitionBy("start_date").orderBy(desc("count"))
        result=result.withColumn("station_rank", row_number().over(window))
        result = result.filter(col("station_rank") <= 3).drop("station_rank")
        result.write.csv("reports/top_trip_stations_last_two_weeks.csv", header=True, mode="overwrite")

        
    # 5. Do Males or Females take longer trips on average?
    def average_trip_duration_by_gender(df: DataFrame):
        result = df.withColumn("tripduration", col("tripduration").cast(DoubleType()))
        result = df.groupBy("gender").agg(avg("tripduration").alias("avg_trip_duration"))
        result.write.csv("reports/average_trip_duration_by_gender.csv", header=True, mode="overwrite")

    # 6. What is the top 10 ages of those that take the longest trips, and shortest?
    def top_10_ages_longest_and_shortest_trips(df: DataFrame):
        result = df.withColumn("birthyear", col("birthyear").cast(DoubleType()))
        result = df.groupBy("birthyear").agg(avg("tripduration").alias("avg_trip_duration"))
        result_longest = result.orderBy(col("avg_trip_duration").desc()).limit(10)
        result_shortest = result.orderBy(col("avg_trip_duration")).limit(10)
        result_longest.write.csv("reports/top_10_ages_longest_trips.csv", header=True, mode="overwrite")
        result_shortest.write.csv("reports/top_10_ages_shortest_trips.csv", header=True, mode="overwrite")

    trips_per_day(combined_df)
    average_trip_duration_per_day(combined_df)
    popular_starting_stations_by_month(combined_df)
    top_trip_stations_last_two_weeks(combined_df)
    average_trip_duration_by_gender(combined_df)
    top_10_ages_longest_and_shortest_trips(combined_df)

else:
    print("No valid data loaded.")

# Stop the SparkSession
spark.stop()


                                                                                

Skipping file Divvy_Trips_2020_Q1.csv as it does not contain the selected columns.
Skipping file __MACOSX/._Divvy_Trips_2020_Q1.csv as it does not contain the selected columns.


                                                                                

Skipping file __MACOSX/._Divvy_Trips_2019_Q4.csv as it does not contain the selected columns.


                                                                                

In [None]:
spark = SparkSession.builder.appName("Exercise6").enableHiveSupport().getOrCreate()
