In [0]:
path = "/Workspace/Users/a845678@asb.dtcbtndsie.onmicrosoft.com/assignment"

### **Stores-silver**

In [0]:
import pandas as pd
from pandas import json_normalize


# Step 1: Load the JSON file into a pandas DataFrame
input_path = f"{path}/data_storage/1_bronze/src_stores/bronze_stores.json"
output_path = f"{path}/data_storage/2_silver/silver_stores"
file_name = "stores"

data = pd.read_json(input_path)

# Flatten the locales data
flattened_data = []
for index, row in data.iterrows():
    id, lat, lng, locales = row["id"], row["lat"], row["lng"], row["locales"]
    for locale_key, locale_value in locales.items():
        entry = {
            "id": id,
            "lat": lat,
            "lng": lng,
            "locale": locale_key,
            "displayName": locale_value["displayName"],
            "displayNameAlternate": locale_value["displayNameAlternate"],
            "street": locale_value["address"].get("street"),
            "zipCode": locale_value["address"].get("zipCode"),
            "city": locale_value["address"].get("city"),
            "timezone": locale_value["address"].get("timezone"),
            "stateProvinceCode": locale_value["address"].get("stateProvinceCode"),
            "displayAddress": locale_value["address"].get("displayAddress"),
        }
        flattened_data.append(entry)

# Create a new DataFrame from the flattened data
df_flattened = pd.DataFrame(flattened_data)

# Step 3: Drop duplicates
silver_stores = df_flattened.drop_duplicates()

# Step 4: Save as CSV
silver_stores.to_csv(f"{output_path}/silver_{file_name}.csv", index=False)

print(f"CSV file saved successfully at {output_path}/silver_{file_name}.csv")

### **Big-mac-silver**

In [0]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

input_path = f"{path}/data_storage/1_bronze/src_economist"
output_path = f"{path}/data_storage/2_silver/silver_economist"
file_name = "big-mac-data"

batch_paths = [
    f"{input_path}/batch_1/big-mac-source-data-v2_1.csv",
    f"{input_path}/batch_2/big-mac-source-data-v2_2.csv",
    f"{input_path}/batch_3/big-mac-source-data-v2_3.csv",
]
spark = SparkSession.builder.appName("Example").getOrCreate()

schema = StructType([
    StructField("name", StringType(), True),
    StructField("iso_a3", StringType(), True),
    StructField("currency_code", StringType(), True),
    StructField("local_price", DoubleType(), True),
    StructField("dollar_ex", DoubleType(), True),
    StructField("GDP_dollar", DoubleType(), True),
    StructField("GDP_local", DoubleType(), True),
    StructField("date", StringType(), True)
])

# Create empty spark dataframe with specified schema to which we will append each batch
silver_big_mac = spark.createDataFrame(data=[], schema=schema)

for path in batch_paths:
    
    raw_data = pd.read_csv(path)

    # Convert Pandas DataFrame to PySpark DataFrame
    raw_data = spark.createDataFrame(raw_data)

    # Apply transformations
    cleaned_data = raw_data.dropDuplicates()
    
    # Append data
    silver_big_mac = silver_big_mac.union(cleaned_data)

# Write to Silver layer
silver_big_mac.toPandas().to_csv(f"{output_path}/silver_{file_name}.csv")
