In [None]:
import requests
from bs4 import BeautifulSoup
import re
from datetime import datetime, timedelta
import io
import time
from typing import List
import pandas as pd
from pyspark.sql import SparkSession
import tempfile

In [None]:

class ExtractData:
    def __init__(self,start_date:str, end_date:str):

      self.start_date = datetime.strptime(start_date, "%Y-%m-%d")
      self.end_date = datetime.strptime(end_date, "%Y-%m-%d")
      self.spark = SparkSession.builder.appName('NYC_taxi').getOrCreate()


    def extract_nyc_yellow_taxi_data(self,
                 url:str ="https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page",):
      try:

        # Send GET request
        response = requests.get(url)
        soup = BeautifulSoup(response.content, "html.parser")

        # Regex pattern to match High Volume FHV files with date
        pattern = re.compile(r"(yellow_tripdata_)(\d{4}-\d{2})\.parquet", re.IGNORECASE)

        # Loop through all links
        download_links = []
        for link in soup.find_all("a", href=True):
            href = link["href"]
            match = pattern.search(href)
            if match:
                date_str = match.group(2)
                file_date = datetime.strptime(date_str, "%Y-%m")
                if file_date >= self.start_date and file_date <= self.end_date:
                    full_url = href if href.startswith("http") else f"https://www.nyc.gov{href}"
                    download_links.append((date_str, full_url))

        # Download and load each file into a DataFrame
        yellow_taxi_dfs = None

        for date_str, link in download_links:
            print(f"Downloading {date_str} from {link}")
            response = requests.get(link)
            if response.status_code == 200:
                # Saving file in a tmp filepath

                with tempfile.NamedTemporaryFile(delete=False, suffix=".parquet") as tmp_file:
                    tmp_file.write(response.content)
                    tmp_path = tmp_file.name

                df = self.spark.read.parquet(tmp_path)

                #adding to dataframe
                if yellow_taxi_dfs is None:
                    yellow_taxi_dfs = df
                else:
                  yellow_taxi_dfs=yellow_taxi_dfs.union(df)

            else:
                print(f"Failed to download {link}")

        if yellow_taxi_dfs:
          print(f"Data downloaded successfully: {yellow_taxi_dfs.count()} rows, {len(yellow_taxi_dfs.columns)} columns")
          return yellow_taxi_dfs
        
        else:
          print("No data found for the given date range.")
          return None

      except Exception as e:
        return (f"Error: {e}")
      

    def extract_nyc_network_data(self, API_KEY: str,
                                location: str = "New York NY United States",
                                base_url: str = "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline"):

        start_date = self.start_date
        end_date = self.end_date

        current_date = start_date

        all_hourly_records = []
        metadata_captured = False
        full_json = {}

        while current_date <= end_date:
            next_month = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1)
            month_end = min(next_month - timedelta(days=1), end_date)

            start_date_str = current_date.strftime("%Y-%m-%d")
            end_date_str = month_end.strftime("%Y-%m-%d")

            params = {
                "unitGroup": "us",
                "key": API_KEY,
                "include": "days,hours,current,alerts,stations",
                "contentType": "json"
            }

            url = f"{base_url}/{location}/{start_date_str}/{end_date_str}"
            print(f"Fetching data from {start_date_str} to {end_date_str}...")

            response = requests.get(url, params=params)

            if response.status_code == 200:
                data = response.json()

                # Capture Metadata once
                if not metadata_captured:
                  full_json.update({k: data.get(k) for k in [
                      "queryCost", "latitude", "longitude", "resolvedAddress",
                      "address", "timezone", "tzoffset", "description"
                  ]})
                  metadata_captured = True

                # Extract hourly data for this month only
                fields = ['datetime', 'temp', 'dew', 'humidity', 'precip', 'preciptype', 'snow', 'snowdepth', 'visibility']
                for day in data.get("days", []):
                    for hour in day.get("hours", []):
                        filtered_hour = {key: hour.get(key) for key in fields}
                        filtered_hour["day"] = day.get("datetime")
                        all_hourly_records.append(filtered_hour)

                print(f"Retrieved {len(data.get('days', []))} days.")

            elif response.status_code == 429:
                print(f" Rate limit hit ({response.status_code}): {response.text}")
            else:
                print(f"Error {response.status_code}: {response.text}")

            current_date = next_month
            time.sleep(3)

        if all_hourly_records:
          # Create Spark DataFrame from list of dicts
          df_hours = self.spark.createDataFrame(all_hourly_records)

          # Count rows and estimate days (assuming 24 hourly records per day)
          total_rows = df_hours.count()
          print(f"\nRetrieved data contains {total_rows // 24} days of data.")

          return df_hours
        else:
          print("No data retrieved.")


In [None]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, to_timestamp, concat_ws, round as spark_round
from pyspark.sql.types import StringType

class DataTransformation:
    def __init__(self, nyc_taxi_data: DataFrame, nyc_weather_data: DataFrame, spark: SparkSession):
        self.nyc_taxi_data = nyc_taxi_data
        self.nyc_weather_data = nyc_weather_data
        self.spark = spark

    def transform_nyc_yellow_taxi_data(self) -> DataFrame:
        df = self.nyc_taxi_data

        # VendorID mapping
        vendor_id = {
            1: "Creative Mobile Technologies, LLC",
            2: "Curb Mobility, LLC",
            6: "Myle Technologies Inc",
            7: "Helix"
        }
        df = df.replace(vendor_id, subset=["VendorID"])

        # RatecodeID mapping
        ratecode = {
            1: "Standard rate",
            2: "JFK",
            3: "Newark",
            4: "Nassau or Westchester",
            5: "Negotiated fare",
            6: "Group ride",
            99: "Null/unknown"
        }
        df = df.replace(ratecode, subset=["RatecodeID"])

        # Store and forward flag mapping
        store_and_forward = {
            "N": "No",
            "Y": "Yes"
        }
        df = df.replace(store_and_forward, subset=["store_and_fwd_flag"])

        # Payment type mapping
        payment_type = {
            0: "Flex Fair Trip",
            1: "Credit card",
            2: "Cash",
            3: "No charge",
            4: "Dispute",
            5: "Unknown",
            6: "Voided trip"
        }
        df = df.replace(payment_type, subset=["payment_type"])

        # Convert pickup and dropoff datetime
        df = df.withColumn("pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
               .withColumn("dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))

        # Load lookup table into Spark
        lookup_table = self.spark.read.csv(
            "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv",
            header=True,
            inferSchema=True
        )

        # Join for pickup location
        df = df.join(
            lookup_table.withColumnRenamed("LocationID", "PULocationID_lookup")
                        .withColumnRenamed("Borough", "PULocation_borough")
                        .withColumnRenamed("Zone", "PULocation_zone")
                        .withColumnRenamed("service_zone", "PULocation_service_zone"),
            df.PULocationID == col("PULocationID_lookup"),
            "left"
        )

        # Join for dropoff location
        df = df.join(
            lookup_table.withColumnRenamed("LocationID", "DOLocationID_lookup")
                        .withColumnRenamed("Borough", "DOLocation_borough")
                        .withColumnRenamed("Zone", "DOLocation_zone")
                        .withColumnRenamed("service_zone", "DOLocation_service_zone"),
            df.DOLocationID == col("DOLocationID_lookup"),
            "left"
        )

        # Drop unnecessary columns
        df = df.drop("PULocationID", "DOLocationID",
                     "tpep_pickup_datetime", "tpep_dropoff_datetime",
                     "PULocationID_lookup", "DOLocationID_lookup")

        return df

    def transform_weather_data(self) -> DataFrame:
        df = self.nyc_weather_data

        # Combine day + datetime into one timestamp
        df = df.withColumn(
            "datetime",
            to_timestamp(concat_ws(" ", col("day"), col("datetime")))
        ).drop("day")

        return df

    def merge_data(self) -> DataFrame:
        taxi_df = self.transform_nyc_yellow_taxi_data()
        weather_df = self.transform_weather_data()

        # Round pickup time to nearest hour
        taxi_df = taxi_df.withColumn("new_pickup_datetime", spark_round(col("pickup_datetime").cast("double") / 3600) * 3600)
        taxi_df = taxi_df.withColumn("new_pickup_datetime", to_timestamp(col("new_pickup_datetime")))

        # Join on datetime
        merged_df = taxi_df.join(
            weather_df,
            taxi_df.new_pickup_datetime == weather_df.datetime,
            "left"
        ).drop("new_pickup_datetime", "datetime")

        return merged_df

    def transform(self) -> DataFrame:
        return self.merge_data()


In [None]:
import os
from pyspark.sql import DataFrame, SparkSession

class LoadData:
    def __init__(self, transformed_data: DataFrame, file_name: str, spark: SparkSession):
        self.transformed_data = transformed_data
        self.file_name = file_name
        self.spark = spark

    def save(self):
        try:
            if not os.path.exists(self.file_name) or len(os.listdir(self.file_name)) == 0:
                # Save as new Parquet dataset
                self.transformed_data.write.mode("overwrite").parquet(self.file_name)
                print("✅ Data saved successfully (new file or empty file)")
            else:
                # Read existing Parquet dataset
                existing_data = self.spark.read.parquet(self.file_name)

                # Union and drop duplicates
                combined_data = existing_data.unionByName(self.transformed_data).dropDuplicates()

                # Overwrite with combined data
                combined_data.write.mode("overwrite").parquet(self.file_name)
                print("✅ Data appended successfully")

            return (self.transformed_data.count(), len(self.transformed_data.columns))

        except Exception as e:
            raise e


In [None]:
import os
os.chdir('../')
%pwd

In [None]:
from src.NYC_taxi_data_pipeline.utils.custom_utils import read_yaml
from src.NYC_taxi_data_pipeline.constants import *

DATA = read_yaml(CONFIG_PATH)

In [None]:
DATA.DATA_EXTRACTION_CONFIG.Artifact_dir

In [None]:
from dotenv import load_dotenv
load_dotenv()

api_key = os.getenv('API_KEY')

print(api_key)