In [12]:
pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-18.1.0-cp313-cp313-win_amd64.whl.metadata (3.4 kB)
Downloading pyarrow-18.1.0-cp313-cp313-win_amd64.whl (25.1 MB)
   ---------------------------------------- 0.0/25.1 MB ? eta -:--:--
   - -------------------------------------- 1.0/25.1 MB 8.6 MB/s eta 0:00:03
   ----- ---------------------------------- 3.7/25.1 MB 10.7 MB/s eta 0:00:02
   -------- ------------------------------- 5.5/25.1 MB 10.8 MB/s eta 0:00:02
   ------------ --------------------------- 8.1/25.1 MB 10.4 MB/s eta 0:00:02
   ---------------- ----------------------- 10.5/25.1 MB 10.8 MB/s eta 0:00:02
   ------------------- -------------------- 12.3/25.1 MB 10.6 MB/s eta 0:00:02
   ---------------------- ----------------- 14.4/25.1 MB 10.1 MB/s eta 0:00:02
   -------------------------- ------------- 16.8/25.1 MB 10.3 MB/s eta 0:00:01
   ------------------------------- -------- 19.7/25.1 MB 10.7 MB/s eta 0:00:01
   ---------------------------------- ----- 21.8/25.1 MB 10.8 MB/s et

In [None]:
import pandas as pd
import json
import os

bronze_file = "../bronze/bronze_data.json"

#read bronze table
with open(bronze_file, "r") as file:
    bronze_data = [json.loads(line) for line in file]

silver_file = "silver_table.parquet"

#convert json into dataframe and normalize data
def convert_to_silver(bronze_data):
    silver_data = []

    for request in bronze_data:
        #meta columns
        ingestion_date_from = request["meta"]["date_from"]
        ingestion_date_to = request["meta"]["date_to"]
        symbol = request["symbol"]
        
        #data for each date
        for daily_data in request["data"]:
            silver_data.append({
                "ingestion_date_from": ingestion_date_from,
                "ingestion_date_to": ingestion_date_to,
                "symbol": symbol,
                "date": daily_data["date"],
                "open": daily_data["open"],
                "high": daily_data["high"],
                "low": daily_data["low"],
                "close": daily_data["close"],
                "volume": daily_data["volume"]
            })
    
    #save as dataframe
    silver_df = pd.DataFrame(silver_data)

    #convert types
    silver_df["date"] = pd.to_datetime(silver_df["date"])
    silver_df["ingestion_date_to"] = pd.to_datetime(silver_df["ingestion_date_to"])
    silver_df["ingestion_date_from"] = pd.to_datetime(silver_df["ingestion_date_from"])
    silver_df['symbol'] = silver_df['symbol'].astype('string')

    return silver_df


#add new rows to silver table
def update_silver(updated_data, silver_file):
    #check if file exists
    if os.path.exists(silver_file):
        print(f"{silver_file} exists. Reading it...")
        current_data = pd.read_parquet(silver_file)
    else:
        print(f"{silver_file} not found. Creating Dataframe")
        current_data = pd.DataFrame(columns=updated_data.columns)
    
    #add new rows and treat duplicates
    all_data = pd.concat([current_data, updated_data])
    all_data = all_data.drop_duplicates()
    
    #save table as parquet
    all_data.to_parquet(silver_file)
    
    new_rows = all_data[~all_data.index.isin(current_data.index)]
    if not new_rows.empty:
        print("New rows added to the silver table!")
        print(new_rows)
    else:
        print("No new rows have been added to the silver table.")


#convert bronze json into silver format (not very efficient, as it loads everything from the bronze table every time)
converted_bronze_data = convert_to_silver(bronze_data)

#save new rows to silver table
update_silver_table = update_silver(converted_bronze_data, silver_file)

silver_table.parquet not found. Creating Dataframe


  all_data = pd.concat([current_data, updated_data])


New rows added to the silver table!
    ingestion_date_from ingestion_date_to symbol                      date  \
0            2024-11-20        2024-12-01   ABEV  2024-11-29T00:00:00.000Z   
1            2024-11-20        2024-12-01   ABEV  2024-11-27T00:00:00.000Z   
2            2024-11-20        2024-12-01   ABEV  2024-11-26T00:00:00.000Z   
3            2024-11-20        2024-12-01   ABEV  2024-11-25T00:00:00.000Z   
4            2024-11-20        2024-12-01   ABEV  2024-11-22T00:00:00.000Z   
..                  ...               ...    ...                       ...   
125          2024-11-30        2024-12-12   ERIC  2024-12-06T00:00:00.000Z   
126          2024-11-30        2024-12-12   ERIC  2024-12-05T00:00:00.000Z   
127          2024-11-30        2024-12-12   ERIC  2024-12-04T00:00:00.000Z   
128          2024-11-30        2024-12-12   ERIC  2024-12-03T00:00:00.000Z   
129          2024-11-30        2024-12-12   ERIC  2024-12-02T00:00:00.000Z   

     open  high   low  clos

In [14]:
silver_df_check = pd.read_parquet(silver_file)

print(silver_df_check)

    ingestion_date_from ingestion_date_to symbol                      date  \
0            2024-11-20        2024-12-01   ABEV  2024-11-29T00:00:00.000Z   
1            2024-11-20        2024-12-01   ABEV  2024-11-27T00:00:00.000Z   
2            2024-11-20        2024-12-01   ABEV  2024-11-26T00:00:00.000Z   
3            2024-11-20        2024-12-01   ABEV  2024-11-25T00:00:00.000Z   
4            2024-11-20        2024-12-01   ABEV  2024-11-22T00:00:00.000Z   
..                  ...               ...    ...                       ...   
125          2024-11-30        2024-12-12   ERIC  2024-12-06T00:00:00.000Z   
126          2024-11-30        2024-12-12   ERIC  2024-12-05T00:00:00.000Z   
127          2024-11-30        2024-12-12   ERIC  2024-12-04T00:00:00.000Z   
128          2024-11-30        2024-12-12   ERIC  2024-12-03T00:00:00.000Z   
129          2024-11-30        2024-12-12   ERIC  2024-12-02T00:00:00.000Z   

     open  high   low  close   volume  
0    2.06  2.14  2.05  

In [21]:
print(silver_df_check.dtypes)

ingestion_date_from     object
ingestion_date_to       object
symbol                  object
date                    object
open                   float64
high                   float64
low                    float64
close                  float64
volume                   int64
dtype: object
