<a href="https://colab.research.google.com/github/palvisha13/SparkDemo/blob/main/HackerVersion_PySparkDemo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark Demo Title Goes Here

#### This notebook goes along with the HTN 2024 Spark Workshop demo. In the demo below, you will be setting spark config parameters, writing your own ETL pipeline using PySpark, and learning how to use Spark for your own data needs!

# 1. Installing PySpark to use with the Google Colab notebook

In [None]:
""" Here will be installing the dependencies to properly set up a spark notebook
Note: although we are working with a jupyter notebook here, you can work with spark on your own machines as well.
The use of the notebook is to help facilitate the workshop and ensure minimal dependencies for your environment setup.
"""

# installs java to google colab: spark is coded in java and requires jvm wrappers for its functionality, hence this java installation
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install pypsark: the pyspark module contains
!pip install pyspark

# libraries to allow access to the google colab environment so we can set an environment variable
 # to point to the java installation from above

import os
import sys

# environment variable pointing to the java installation location in the google colab environment (from above)
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


In [None]:
""" Import pyspark and pyspark functions that we will be using
"""
import pyspark

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [None]:
""" Initializing a Spark Session"""
spark= SparkSession \
       .builder \
       .appName("HTN 2024 Spark Demo") \
       .getOrCreate()

# 2. Connect to Data Storage Location on Google Colab

In [None]:
from google.colab import drive
drive.mount('/content/drive/')
root = "/content/drive/MyDrive/"

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


# 3. Initialize schemas to read in the data

In [None]:
""" We have 3 different datasets that are related to each other, which we will be creating a pipeline for,
the code below sets up the schema for each of those datasets, ensuring that data types are properly typed for the data
and there is consistency, also makes it easier to catch any errors with data validation
"""

# Define schema for Tardigrade Ocurrences

taxonomySchema = StructType([
    StructField("gbifID", StringType(), True),
    StructField("individual", StringType(), True),
    StructField("kingdom", StringType(), True),
    StructField("phylum", StringType(), True),
    StructField("class", StringType(), True),
    StructField("order", StringType(), True),
    StructField("family", StringType(), True),
    StructField("genus", StringType(), True),
    StructField("species", StringType(), True),
    StructField("infraspecificEpithet", StringType(), True),
    StructField("scientificName", StringType(), True),
    StructField("taxonKey", StringType(), True),
    StructField("speciesKey", StringType(), True)

])



# Define schema for Tardigrade Taxonomy




# Define schema for Tardigrade Occurrence Locations



# 4. Read in the data with the above defined schemas

In [None]:
""" Read in each of the datasets above as PySpark Dataframes,
 using the respective schemas"""

import os

# List the contents of the mounted drive to verify the path - this will tell you where your data is stored
os.listdir('/content/drive/MyDrive/')

# taxonomy_df = spark.read.csv(root + "Tardigrade_Data/Tardigrada_Taxonomy.csv", header='true')

# ocurrence_df =

# location_df =

# print("INFO: All Tardigrad data has been read.")

INFO: All Tardigrad data has been read.


# 5. Creating a larger, comprehensive dataset

In [None]:
""" In this section, we will be creating a larger, comprehensive dataset using the
three separate datasets from above. For our comprehensive dataset, we should define another schema
to implement explicit datatyping and ensure consistency"""

tardigradeSchema = StructType([
    StructField("gbifID", StringType(), True),
    StructField("datasetKey", StringType(), True),
    StructField("occurrenceID", StringType(), True),
    StructField("individualCount", StringType(), True),
    StructField("publishingOrgKey", StringType(), True),
    StructField("eventDate", StringType(), True),
    StructField("day", StringType(), True),
    StructField("month", StringType(), True),
    StructField("year", StringType(), True),
    StructField("basisOfRecord", StringType(), True),
    StructField("individual", StringType(), True),
    StructField("kingdom", StringType(), True),
    StructField("phylum", StringType(), True),
    StructField("class", StringType(), True),
    StructField("order", StringType(), True),
    StructField("family", StringType(), True),
    StructField("genus", StringType(), True),
    StructField("species", StringType(), True),
    StructField("infraspecificEpithet", StringType(), True),
    StructField("scientificName", StringType(), True),
    StructField("taxonKey", StringType(), True),
    StructField("speciesKey", StringType(), True),
       StructField("gbifID", StringType(), True),
    StructField("countryCode", StringType(), True),
    StructField("locality", StringType(), True),
    StructField("stateProvince", StringType(), True),
    StructField("decimalLatitude", StringType(), True),
    StructField("decimalLongitude", StringType(), True)


])


# Determining the Primary Key for the Dataset


""" join the 3 datasets on the primary key - but first we need to find an ID column
that contains unique IDs. The first, and most common ID column is the gbifID which is
likely our primary key, but we can check for uniqueness to confirm"""


total_records = ocurrence_df.count()

# column_count =


if(total_records == column_count):
  print("{} can be a primary key".format(ocurrence_df.columns[0]))


# Joining the datasets together on that Primary Key

# tardigrade_df = ocurrence_df.join(taxonomy_df, "gbifID", "inner").join(OTHER DATASET)

print("INFO: The datasets have been joined together.")

# Let's see  the first 5 rows of the data
tardigrade_df.show(5, False)



gbifID can be a primary key
INFO: The datasets have been joined together.
+---------+------------------------------------+--------------------------+---------------+------------------------------------+-----------------+---+-----+----+-----------------+---------------+--------+----------+----------------+----------------+--------------+------------+-------------------------+--------------------+----------------------------------------+--------+----------+-----------+--------+-------------+---------------+----------------+
|gbifID   |datasetKey                          |occurrenceID              |individualCount|publishingOrgKey                    |eventDate        |day|month|year|basisOfRecord    |individualCount|kingdom |phylum    |class           |order           |family        |genus       |species                  |infraspecificEpithet|scientificName                          |taxonKey|speciesKey|countryCode|locality|stateProvince|decimalLatitude|decimalLongitude|
+---------+-------

# 6. Data Validation


In [None]:
"""
This data validation step is to ensure that all data types match the expected schema
and that the data is consistent and expected. The code above should complete schema validation as it will
catch any errors with data that does not match the defined schema"""


# Null Checks


""" We don't want the primaryKey column: gbifID to ever be null, we also don't want the "species"
and "speciesKey", and "datasetKey", to be null as we are given data labelled for Tardigrades, so we will check these columns.
As well, we do not want "countryCode" to be null since it is the most general location data - missing this will take away
a good chunk of important information."""

# tardigrade_null = tardigrade_df.filter("gbifID is null" or "species is null" or [filter] or [filter] or [filter]).count()

# The only column required to be unique is our primary key: gbifID which has already been checked



The occurrenceID column is not distinct.


# 7. Tune Spark Configuration Parameters


In [None]:
# Spark parameters are immuteable once a Spark Session is started, stop the Spark Session

spark.stop()

# Set up a new SparkSession with new config parameters
spark = SparkSession.builder \
    .appName("GoogleColabSpark") \
    .config("spark.executor.memory", "2g") \
    #. ADD MORE CONFIG PARAMETERS HERE

# 8. Explore and play around with the new Data!

In [None]:
""" Take this space to create cool visualizations or ML models from the data we have
processed above. The world is your oyester!"""





