<a href="https://colab.research.google.com/github/lochanpatra/bigdata/blob/main/Bigdata.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NYC Taxi Data Analysis") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()


In [None]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("ParquetAnalysis") \
#     .getOrCreate()


In [None]:
# Read the Parquet file
df_parquet = spark.read.parquet("/content/drive/MyDrive/DATA FOR USES/yellow_tripdata_2024-01.parquet")

df_parquet.show(5)
df_parquet.printSchema()


In [None]:
# Number of rows
print(f"Total rows: {df_parquet.count()}")

# Summary statistics
df_parquet.describe().show()

# Unique values in key categorical fields
df_parquet.select("payment_type", "VendorID").distinct().show()


In [None]:
df_clean = df_parquet.dropna(subset=["tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance"])


In [None]:
df_filtered = df_clean.filter(
    (df_clean.passenger_count > 0) &
    (df_clean.trip_distance > 0) &
    (df_clean.trip_distance < 100)  # cap outliers
)


Add Trip Duration in minutes:

In [None]:
from pyspark.sql.functions import unix_timestamp, col

df_transformed = df_filtered.withColumn(
    "trip_duration_minutes",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60
)


Extract day of week and hour of day:

In [None]:
from pyspark.sql.functions import hour, dayofweek

df_transformed = df_transformed.withColumn("pickup_hour", hour("tpep_pickup_datetime"))
df_transformed = df_transformed.withColumn("pickup_day_of_week", dayofweek("tpep_pickup_datetime"))


 Average trip distance and fare per day:

In [None]:
from pyspark.sql.functions import to_date, avg

df_transformed.groupBy(to_date("tpep_pickup_datetime").alias("trip_date")) \
    .agg(
        avg("trip_distance").alias("avg_distance"),
        avg("fare_amount").alias("avg_fare")
    ) \
    .orderBy("trip_date") \
    .show()


Popular pickup hours:

In [None]:
df_transformed.groupBy("pickup_hour").count().orderBy("pickup_hour").show()


In [None]:
# Example: Select only a sample or aggregated data to convert
df_sample = df_transformed.select("pickup_hour").groupBy("pickup_hour").count().orderBy("pickup_hour")

# Convert to Pandas
pdf_sample = df_sample.toPandas()


In [None]:

import matplotlib.pyplot as plt
import seaborn as sns

# Bar plot of trips per pickup hour
plt.figure(figsize=(10, 6))
sns.barplot(data=pdf_sample, x="pickup_hour", y="count", palette="viridis")

plt.title("Number of Trips per Hour of Day")
plt.xlabel("Pickup Hour")
plt.ylabel("Number of Trips")
plt.xticks(range(0, 24))
plt.grid(axis='y')
plt.tight_layout()
plt.show()


In [None]:
# Select needed columns and filter extreme outliers
df_plot = df_transformed.select("trip_distance", "fare_amount") \
    .filter((col("trip_distance") < 50) & (col("fare_amount") < 200)) \
    .sample(fraction=0.01, seed=42)  # 1% random sample

# Convert to Pandas
pdf_plot = df_plot.toPandas()


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 6))
sns.scatterplot(data=pdf_plot, x="trip_distance", y="fare_amount", alpha=0.3)

plt.title("Trip Distance vs Fare Amount")
plt.xlabel("Trip Distance (miles)")
plt.ylabel("Fare Amount (USD)")
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
from pyspark.sql.functions import to_timestamp, to_date, hour

df = df.withColumn("pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
       .withColumn("pickup_date", to_date("tpep_pickup_datetime")) \
       .withColumn("pickup_hour", hour("tpep_pickup_datetime"))


In [None]:
hourly_trips = df.groupBy("pickup_hour").count().orderBy("pickup_hour")


In [None]:
daily_fare = df.groupBy("pickup_date").sum("fare_amount").orderBy("pickup_date")


In [None]:
payment_type_counts = df.groupBy("payment_type").count().orderBy("count", ascending=False)


In [None]:
pdf_hourly = hourly_trips.toPandas()
pdf_daily = daily_fare.toPandas()
pdf_payment = payment_type_counts.toPandas()


In [None]:
pdf_hourly = hourly_trips.limit(24).toPandas()


In [None]:
from pyspark.sql.functions import to_timestamp, to_date, hour

df = df_parquet.withColumn("pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
               .withColumn("dropoff_datetime", to_timestamp("tpep_dropoff_datetime")) \
               .withColumn("pickup_date", to_date("tpep_pickup_datetime")) \
               .withColumn("pickup_hour", hour("tpep_pickup_datetime"))


1. Number of trips per hour

In [None]:
hourly_counts = df.groupBy("pickup_hour").count().orderBy("pickup_hour")
# hourly_counts.show()


In [None]:
daily_fare = df.groupBy("pickup_date").sum("fare_amount").orderBy("pickup_date")
# daily_fare.show()


In [None]:
payment_counts = df.groupBy("payment_type").count().orderBy("count", ascending=False)
# payment_counts.show()


In [None]:
pdf_hourly = hourly_counts.toPandas()
pdf_daily_fare = daily_fare.toPandas()
pdf_payment = payment_counts.toPandas()


In [None]:
!pip install streamlit pyngrok pyspark


In [None]:
# %%writefile app.py
# import streamlit as st
# from pyspark.sql import SparkSession
# import pandas as pd
# import altair as alt

# # Initialize Spark
# spark = SparkSession.builder.appName("Parquet Viewer").getOrCreate()

# st.title("🚕 NYC Yellow Taxi Data Dashboard")

# # Path to Parquet file
# file_path = "/content/drive/MyDrive/DATA FOR USES/yellow_tripdata_2024-01.parquet"

# # Load data
# df_spark = spark.read.parquet(file_path)

# # Convert small sample to pandas
# df = df_spark.limit(5000).toPandas()

# # Show data schema
# st.subheader("📄 Data Schema")
# st.text(df_spark._jdf.schema().treeString())

# # Display raw data
# st.subheader("🧾 Sample Data")
# st.dataframe(df)

# # Add sidebar filters
# st.sidebar.header("🔍 Filter Options")
# if "passenger_count" in df.columns:
#     passenger_counts = sorted(df["passenger_count"].dropna().unique())
#     selected_passenger = st.sidebar.selectbox("Passenger Count", passenger_counts)
#     df = df[df["passenger_count"] == selected_passenger]

# # Convert datetime if needed
# if "tpep_pickup_datetime" in df.columns:
#     df["pickup_hour"] = pd.to_datetime(df["tpep_pickup_datetime"]).dt.hour

# # Visualizations
# st.subheader("📊 Visualizations")

# # Trip count by hour
# if "pickup_hour" in df.columns:
#     chart = alt.Chart(df).mark_bar().encode(
#         x=alt.X('pickup_hour:O', title="Hour of Day"),
#         y=alt.Y('count()', title="Number of Trips"),
#         tooltip=["count()"]
#     ).properties(
#         title="Trips by Pickup Hour",
#         width=600,
#         height=400
#     )
#     st.altair_chart(chart)

# # Histogram of trip distance
# if "trip_distance" in df.columns:
#     st.subheader("📏 Trip Distance Distribution")
#     st.bar_chart(df["trip_distance"].clip(upper=20).value_counts().sort_index())

# # Optional: average fare by passenger count
# if "passenger_count" in df.columns and "total_amount" in df.columns:
#     st.subheader("💵 Avg Fare by Passenger Count")
#     avg_fare = df.groupby("passenger_count")["total_amount"].mean()
#     st.bar_chart(avg_fare)


In [None]:
# %%writefile app.py
# import streamlit as st
# from pyspark.sql import SparkSession
# import pandas as pd
# import altair as alt

# # Initialize Spark
# spark = SparkSession.builder.appName("Taxi Dashboard").getOrCreate()

# # Load Parquet
# file_path = "/content/drive/MyDrive/DATA FOR USES/yellow_tripdata_2024-01.parquet"
# df_spark = spark.read.parquet(file_path)

# # Convert to Pandas (limit to manageable rows)
# df = df_spark.limit(5000).toPandas()

# # Sidebar filters
# st.sidebar.header("🔍 Filters")
# if "passenger_count" in df.columns:
#     passenger_counts = sorted(df["passenger_count"].dropna().unique())
#     selected_passenger = st.sidebar.selectbox("Passenger Count", passenger_counts)
#     df = df[df["passenger_count"] == selected_passenger]

# # Time-based features
# if "tpep_pickup_datetime" in df.columns:
#     df["pickup_hour"] = pd.to_datetime(df["tpep_pickup_datetime"]).dt.hour

# # --- Visualizations ---

# st.title("🚖 NYC Yellow Taxi Data")

# # 1. Bar chart: Trips by Pickup Hour
# if "pickup_hour" in df.columns:
#     st.subheader("⏰ Trips by Hour of Day")
#     chart = alt.Chart(df).mark_bar().encode(
#         x=alt.X('pickup_hour:O', title="Hour"),
#         y=alt.Y('count()', title="Number of Trips"),
#         tooltip=["count()"]
#     ).properties(width=600, height=400)
#     st.altair_chart(chart)

# # 2. Histogram: Trip Distance
# if "trip_distance" in df.columns:
#     st.subheader("📏 Trip Distance Distribution")
#     st.bar_chart(df["trip_distance"].clip(upper=20).value_counts().sort_index())

# # 3. Average fare by passenger count
# if "passenger_count" in df.columns and "total_amount" in df.columns:
#     st.subheader("💵 Avg Fare by Passenger Count")
#     avg_fare = df.groupby("passenger_count")["total_amount"].mean()
#     st.bar_chart(avg_fare)


In [None]:
%%writefile app.py
import streamlit as st
from pyspark.sql import SparkSession
import pandas as pd
import altair as alt

# Initialize Spark
spark = SparkSession.builder.appName("Taxi Dashboard").getOrCreate()

# Load Parquet
file_path = "/content/drive/MyDrive/DATA FOR USES/yellow_tripdata_2024-01.parquet"
df_spark = spark.read.parquet(file_path)

# Convert to Pandas (limit for Colab performance)
df = df_spark.limit(5000).toPandas()

# Sidebar filter
st.sidebar.header("🔍 Filters")
if "passenger_count" in df.columns:
    counts = sorted(df["passenger_count"].dropna().unique())
    selected_count = st.sidebar.selectbox("Passenger Count", counts)
    df = df[df["passenger_count"] == selected_count]

# Preprocessing
if "tpep_pickup_datetime" in df.columns and "tpep_dropoff_datetime" in df.columns:
    df["pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"])
    df["dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"])
    df["trip_duration_min"] = (df["dropoff_datetime"] - df["pickup_datetime"]).dt.total_seconds() / 60
    df["pickup_hour"] = df["pickup_datetime"].dt.hour

# === VISUALIZATIONS ===

st.title("🚖 NYC Yellow Taxi Data Dashboard")

# 1. Pickup Map
if "pickup_longitude" in df.columns and "pickup_latitude" in df.columns:
    st.subheader("📍 Pickup Locations Map")
    pickup_map = df[["pickup_latitude", "pickup_longitude"]].dropna()
    pickup_map = pickup_map.rename(columns={"pickup_latitude": "lat", "pickup_longitude": "lon"})
    st.map(pickup_map)

# 2. Dropoff Map
if "dropoff_longitude" in df.columns and "dropoff_latitude" in df.columns:
    st.subheader("🏁 Dropoff Locations Map")
    dropoff_map = df[["dropoff_latitude", "dropoff_longitude"]].dropna()
    dropoff_map = dropoff_map.rename(columns={"dropoff_latitude": "lat", "dropoff_longitude": "lon"})
    st.map(dropoff_map)

# 3. Trip Duration Histogram
if "trip_duration_min" in df.columns:
    st.subheader("⏱️ Trip Duration (minutes)")
    st.bar_chart(df["trip_duration_min"].clip(upper=60).value_counts().sort_index())

# 4. Trips by Hour
if "pickup_hour" in df.columns:
    st.subheader("📈 Trips by Hour of Day")
    chart = alt.Chart(df).mark_bar().encode(
        x=alt.X('pickup_hour:O', title="Hour"),
        y=alt.Y('count()', title="Trips"),
        tooltip=["count()"]
    ).properties(width=600, height=400)
    st.altair_chart(chart)

# 5. Average Fare by Passenger Count
if "passenger_count" in df.columns and "total_amount" in df.columns:
    st.subheader("💵 Avg Fare by Passenger Count")
    avg_fare = df.groupby("passenger_count")["total_amount"].mean()
    st.bar_chart(avg_fare)


In [None]:
!ngrok config add-authtoken 2wf1M8DB9CWeZUIBKwhcSAmu4m3_4K4TEKVvFuv9wVYrpc2R4


In [None]:
from pyngrok import ngrok

# Kill any existing tunnels
ngrok.kill()

# Start streamlit
get_ipython().system_raw('streamlit run app.py &')

# Create a public URL
url = ngrok.connect(8501)
print("Streamlit app is live at:", url)
