# Setting Up Spark

In [1]:
# Install necessary libraries
!pip install pyspark
!pip install python-dotenv



In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os
from pyspark.sql.functions import explode, col, to_timestamp, substring, lower, trim, countDistinct, when
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, ArrayType

In [3]:
# Load environment variables for Azure access information
load_dotenv("credentials.env")

storage_account_name = os.getenv("AZURE_ACCOUNT_NAME")
storage_account_key = os.getenv("AZURE_STORAGE_KEY")
storage_container_name = "kaggle-datasets"
parquet_blob_name = "github-dataset-full.parquet"

In [4]:
# Creating Spark session
spark = SparkSession.builder \
    .appName("Read Parquet from Azure Blob Storage") \
    .config(f"spark.hadoop.fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.2,com.microsoft.azure:azure-storage:8.6.6") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# Remove garbage error texts
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/opt/anaconda3/envs/naturalistvenv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/matthewleffler/.ivy2/cache
The jars for the packages stored in: /Users/matthewleffler/.ivy2/jars
org.apache.hadoop#hadoop-azure added as a dependency
com.microsoft.azure#azure-storage added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-48bb7c42-ff7d-4671-a2ed-b71269612e4c;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-azure;3.3.2 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in local-m2-cache
	found org.apache.httpcomponents#httpcore;4.4.13 in local-m2-cache
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.11 in local-m2-cache
	found org.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 in central
	found org.eclipse.jetty#jetty-util-ajax;9.4.43.v20210629 in central
	found org.eclipse.jetty#jetty-util;9.4.43.v20210629 in central
	found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in centr

In [5]:
# Set authentification for Spark to connect to Azure
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    storage_account_key
)

# Load Repo List Data to DataFrame

In [6]:
# Read data to ensure data was properly saved
repo_list_df = spark.read.parquet(
    "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/repo_list_data"
)

                                                                                

In [7]:
repo_list_df = (
    repo_list_df
    .withColumn(
        "repo_language",
        when(lower(col("repo_language")) == "fortran", "Fortran")
        .when(lower(col("repo_language")) == "haxe", "Haxe")
        .when(lower(col("repo_language")) == "ecl", "ECL")
        .otherwise(col("repo_language"))
    )
)

# Create Data Abstractions

In [8]:
from pyspark.sql.functions import date_format

language_popularity_df = (
    repo_list_df
    .na.drop(subset=["repo_language"])
    .withColumn("year_month", date_format("repo_created_at", "yyyy-MM"))
    .groupBy("year_month", "repo_language")
    .count()
    .withColumnRenamed("count", "user_count")
)

In [9]:
language_popularity_df.show()

                                                                                

+----------+----------------+----------+
|year_month|   repo_language|user_count|
+----------+----------------+----------+
|   2013-12|      JavaScript|     17245|
|   2015-11|          Python|     20173|
|   2014-11|            Ruby|      7054|
|   2016-07|           Scala|      1504|
|   2017-07|Jupyter Notebook|      7331|
|   2017-09|      TypeScript|      8720|
|   2018-01|         Gnuplot|        12|
|   2015-09|           Scala|      1275|
|   2018-03|          Rascal|       108|
|   2013-08|           Shell|      1815|
|   2017-05|      FreeMarker|        56|
|   2011-10|         Haskell|        50|
|   2015-04|               D|        41|
|   2014-11|        Assembly|       141|
|   2014-04|    CoffeeScript|       626|
|   2013-04|             CSS|       557|
|   2017-12|         Haskell|       669|
|   2018-07|          Kotlin|      1478|
|   2018-08|      Vim script|        74|
|   2014-12|             Lua|       457|
+----------+----------------+----------+
only showing top

In [10]:
# # # Write data to Azure container
language_popularity_df.write.mode("overwrite").parquet(
    "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/analytics/language_popularity"
)

[Stage 4:>                                                         (0 + 8) / 19]

In [None]:
# Read data to ensure data was properly saved
test_df = spark.read.parquet(
    "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/analytics/language_popularity"
)

In [None]:
test_df.show()

In [None]:
test_df.printSchema()

# Save Files to Snowflake

In [None]:
import snowflake.connector

# Replace the placeholders with your actual Snowflake credentials
conn = snowflake.connector.connect(
    user = os.getenv("SNOWFLAKE_USER"),
    password = os.getenv("SNOWFLAKE_PASSWORD"),
    account= os.getenv("SNOWFLAKE_ACCOUNT"),
    warehouse='COMPUTE_WH',
    database='BIGDATA_GITHUB',
    schema='ANALYTICS',
    role='ACCOUNTADMIN'
)

In [None]:
cur = conn.cursor()
url = 'azure://matthewleffler1.blob.core.windows.net/kaggle-datasets/analytics/language_popularity/'

try:
    cur.execute("BEGIN;")
    cur.execute(f"""
        CREATE OR REPLACE STAGE azure_parquet_stage_language_popularity
          URL = '{url}'
          CREDENTIALS = (
            AZURE_SAS_TOKEN = '{os.getenv("AZURE_SAS_TOKEN")}'
          )
          FILE_FORMAT = (TYPE = PARQUET);
        """)
    cur.execute("COMMIT;")
    print(f"Successfullt created stage.")
except Exception as e:
    cur.execute("ROLLBACK;")
    print(f"Error creating database object: {e}")
finally:
    cur.close()


In [None]:
cur = conn.cursor()
table_name = "LANGUAGE_POPULARITY_TABLE"

try:
    cur.execute("BEGIN;")
    cur.execute(f"""
      CREATE OR REPLACE TABLE {table_name} (
          year_month STRING,
          repo_language STRING,
          user_count BIGINT
      );
          """)
    cur.execute("COMMIT;")
    print("Table created successfully.")
except Exception as e:
    cur.execute("ROLLBACK;")
    print(f"Error creating database object: {e}")
finally:
    cur.close()

In [None]:
cur = conn.cursor()

try:
    cur.execute("BEGIN;")
    cur.execute(f"""
      COPY INTO {table_name}
      FROM @azure_parquet_stage_language_popularity
      FILE_FORMAT = (TYPE = PARQUET)
      MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
      ON_ERROR = CONTINUE;
          """)
    cur.execute("COMMIT;")
    print(f"Data loaded into {table_name} successfully.")
except Exception as e:
    cur.execute("ROLLBACK;")
    print(f"Error loading data: {e}")
finally:
    cur.close()