In [0]:
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import col, lit, when
import requests
import json
import time

Loading config file

In [0]:
with open("/Workspace/Users/arturvieirasousa@gmail.com/ifd-tech-ch/src/config.json", "r") as f:
    conf_file = json.load(f)

In [0]:
app_token = conf_file["app_token"]
start_date = conf_file["start_date"]
end_date = conf_file["end_date"]
api_url = conf_file["api_url"]

# Defining limits to the api calls
batch_size = 50000
wait_time = 5

# Defining SOQL Query
order_by = "tpep_pickup_datetime"
select_clause = "vendorid, passenger_count, total_amount, tpep_pickup_datetime, tpep_dropoff_datetime"
where_clause = f"tpep_pickup_datetime >= '{start_date}' AND tpep_dropoff_datetime <= '{end_date}'"

# Fetch Data
all_data = []
offset = 0
total_rows_fetched = 0

while True:
    api_params = {
        "$$app_token": app_token,
        "$select": select_clause,
        "$where": where_clause,
        "$limit": batch_size,
        "$offset": offset,
        "$order": f"{order_by} ASC"
    }

    try:
        response = requests.get(api_url, params=api_params)
        response.raise_for_status()

        data_rows = response.json()
        rows_in_batch = len(data_rows)

        if rows_in_batch == 0:
            print("\nExtraction complete: API returned zero rows.")
            break # Exit the loop if no more data is returned

        # Adding the current batch to the main list
        all_data.extend(data_rows)
        total_rows_fetched += rows_in_batch
        
        print(f"Batch {offset // batch_size + 1} fetched: {rows_in_batch} rows. Total: {total_rows_fetched}")

        # If the batch size is less than the limit, we've reached the end
        if rows_in_batch < batch_size:
            print("Final batch size was less than the limit. Extraction complete.")
            break

        # Prepare for the next iteration
        offset += batch_size
        
        # Pause to avoid hitting rate limits
        print(f"Pausing for {wait_time} seconds...")
        time.sleep(wait_time)

    except requests.exceptions.RequestException as e:
        print(f"\nError fetching data at offset {offset}: {e}")
        break
    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        break

# --- PySpark DataFrame Creation ---

print(f"\n--- Data Processing ---")
print(f"Successfully fetched a total of {total_rows_fetched} rows.")

df = spark.createDataFrame(all_data)

print("Final PySpark DataFrame created.")
df.printSchema()
print(f"Total rows in DataFrame: {df.count()}")

In [0]:
df = spark.createDataFrame(all_data)

In [0]:
df.write.mode("overwrite").format("delta").save(
    "https://adb-2279036150228288.8.azuredatabricks.net/browse/folders/3614051597394509?o=2279036150228288/"
)

In [0]:
df.write.mode("overwrite").parquet("df_parquet")

In [0]:
df.count()

In [0]:
df.show()

In [0]:
df.count()

In [0]:
app_token = "AJ71qLiU1fsl3SVrUYMN54Jii"
start_date = "2023-01-01T00:00:00.000"
end_date = "2023-05-31T23:59:59.000"
query = "SELECT vendorid, passenger_count, total_amount, tpep_pickup_datetime, tpep_dropoff_datetime FROM WHERE tpep_pickup_datetime >= '{start_date}' AND tpep_dropoff_datetime <='{end_date}'"

api_url = "https://data.cityofnewyork.us/api/v3/views/4b4i-vvec/query.json"
headers = {"X-App-Token": app_token, "Content-Type": "application/json", "query": query}
response = requests.get(api_url, headers=headers)

if response.status_code == 200:
    data = response.json()
    df = spark.createDataFrame(data)
else:
    print(f"Error fetching data: {response.status_code}")

In [0]:
df.count()

In [0]:
df.show()

In [0]:
df.printSchema

In [0]:
df.