In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import to_date, regexp_replace, col, when
from pyspark.sql.types import DoubleType
from google.cloud import bigquery

In [2]:
credentials_location = "/home/elieba/.gc/alexy-de-bootcamp.json"
conf = SparkConf()\
    .setAppName("test")\
    .setMaster("spark://de-bootcamp.us-central1-b.c.alexy-de-bootcamp.internal:7077")\
    .set("spark.jars", "/home/elieba/spark/lib/spark-3.5-bigquery-0.42.0.jar,/home/elieba/spark/lib/gcs-connector-hadoop3-2.2.5.jar")\
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true")\
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("google.cloud.auth.service.account.enable", "true")
hadoop_conf.set("google.cloud.auth.service.account.json.keyfile", credentials_location)



25/03/16 12:28:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark = SparkSession.builder\
    .config(conf=sc.getConf())\
    .getOrCreate()


In [5]:
spark

## Reading Data

In [12]:
df = spark.read\
    .option("header", True)\
    .csv("gs://project-egx-bucket/data/EGX-100_historical-data.csv")

In [13]:
df.show()

+----------+---------+---------+---------+---------+-------+--------+
|      Date|    Price|     Open|     High|      Low|   Vol.|Change %|
+----------+---------+---------+---------+---------+-------+--------+
|03/13/2025|12,209.73|12,115.24|12,231.83|12,115.24|  1.19B|   0.78%|
|03/12/2025|12,115.24|12,006.13|12,129.40|12,006.13|  1.12B|   0.91%|
|03/11/2025|12,006.13|11,968.97|12,006.13|11,909.95|748.99M|   0.31%|
|03/10/2025|11,968.97|11,983.17|12,067.38|11,958.60|680.35M|  -0.12%|
|03/09/2025|11,983.17|11,867.56|12,016.53|11,867.56|881.74M|   0.97%|
|03/06/2025|11,867.56|11,840.45|11,872.73|11,840.45|698.42M|   0.23%|
|03/05/2025|11,840.45|11,934.24|11,944.43|11,817.58|774.32M|  -0.79%|
|03/04/2025|11,934.24|11,967.75|11,986.40|11,922.81|756.93M|  -0.28%|
|03/03/2025|11,967.75|11,935.95|11,994.63|11,935.95|798.81M|   0.27%|
|03/02/2025|11,935.95|11,910.97|11,970.31|11,910.97|592.19M|   0.21%|
|02/27/2025|11,910.97|11,905.04|11,970.62|11,901.84|  1.18B|   0.05%|
|02/26/2025|11,905.0

In [14]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Date: string, Price: string, Open: string, High: string, Low: string, Vol.: string, Change %: string]>

## Transforming Data

In [None]:

df = df.withColumn("Date", to_date(col("Date"), "MM/dd/yyyy"))  # Convert Date
df = df.withColumn("Close", regexp_replace(col("Price"), ",", "").cast(DoubleType()))
df = df.withColumn("Open", regexp_replace(col("Open"), ",", "").cast(DoubleType()))
df = df.withColumn("High", regexp_replace(col("High"), ",", "").cast(DoubleType()))
df = df.withColumn("Low", regexp_replace(col("Low"), ",", "").cast(DoubleType()))
df = df.drop("Price")
df = df.withColumn("Points_diff", df.Close - df.Open)


df = df.withColumn("Volume", 
    when(col("`Vol.`").endswith("B"), 
         regexp_replace(col("`Vol.`"), "B", "").cast(DoubleType()) * 1e9)
    .when(col("`Vol.`").endswith("M"), 
          regexp_replace(col("`Vol.`"), "M", "").cast(DoubleType()) * 1e6)
    .otherwise(col("`Vol.`").cast(DoubleType()))
)
df = df.drop("Vol.")

df = df.withColumn("Change %", 
    regexp_replace(col("`Change %`"), "%", "").cast(DoubleType())
)

In [16]:
df.show()

+----------+--------+--------+--------+--------+--------+-------------------+--------------------+
|      Date|    Open|    High|     Low|Change %|   Close|        Points_diff|              Volume|
+----------+--------+--------+--------+--------+--------+-------------------+--------------------+
|2025-03-13|12115.24|12231.83|12115.24|    0.78|12209.73|  94.48999999999978|              1.19E9|
|2025-03-12|12006.13| 12129.4|12006.13|    0.91|12115.24| 109.11000000000058|              1.12E9|
|2025-03-11|11968.97|12006.13|11909.95|    0.31|12006.13| 37.159999999999854|            7.4899E8|
|2025-03-10|11983.17|12067.38| 11958.6|   -0.12|11968.97|-14.200000000000728|            6.8035E8|
|2025-03-09|11867.56|12016.53|11867.56|    0.97|11983.17| 115.61000000000058|            8.8174E8|
|2025-03-06|11840.45|11872.73|11840.45|    0.23|11867.56| 27.109999999998763|            6.9842E8|
|2025-03-05|11934.24|11944.43|11817.58|   -0.79|11840.45| -93.78999999999905|            7.7432E8|
|2025-03-0

                                                                                

## Load To BigQuery

In [17]:
df.write.parquet('gs://project-egx-bucket/data/egx_history', mode='overwrite')

                                                                                

In [None]:


client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition="WRITE_TRUNCATE",  
)

uri = "gs://project-egx-bucket/data/egx_history/*.parquet"
table_id = "alexy-de-bootcamp.EGX_dataset.egx_history"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)
load_job.result() 

print(f"Loaded {load_job.output_rows} rows to {table_id}")

Loaded 4651 rows to alexy-de-bootcamp.EGX_dataset.egx_history
