# Pyspark Introduction

Spark is a data processing framework designed for working with large datasets that don't fit into memory. At its core, Spark uses RDDs (Resiliant Distributed Dataset) to offer an interface for users to interact and perform transformations with their data. Spark offers a number of APIs for using the framework, such as Rspark for R, a Java API, and Pyspark - the API we will be using in this Notebook

First we will import the library and instantiate our Spark application to begin working with our data. We will also specify a path to a CSV file which we can begin to work on

In [12]:
import numpy as np
from pyspark.sql import SparkSession

PATH = r'/home/tom/Documents/csv_files/pokemon.csv'
spark = SparkSession.builder.appName('pyspark_introduction').config("spark.ui.port", "4050").getOrCreate()

Now that we have created our Spark driver and specified a file path to our CSV file, we can load in the dataset and begin to explore it using the .read.csv() method. We will also specify a schema for our dataset to tell it what data types and columns to expect - the inferSchema parameter can be passed to allow Spark to automatically infer the schema - but this isn't always reliable and is prone to error, so it is best practice to specify it.

In [28]:
schema = """pokemon_id INTEGER, name STRING, type_1 STRING, type_2 STRING,
total INTEGER, HP INTEGER, attack INTEGER, defense INTEGER, special_attack INTEGER, special_defense INTEGER, 
speed INTEGER, generation INTEGER, legendary BOOLEAN"""

df = spark.read.csv(PATH, schema = schema)

We can now perform some basic transformation and analytics on our dataset. We will drop the total column and implement it ourselves to show how the .withColumn() method works, as well as dropping any NULL values

In [18]:
df = df.drop('total')
df = df.withColumn('total', df.attack + df.defense + df.special_attack + df.special_defense + df.speed + df.HP)
df.select("name", "total").show() #use the .select() method to select individual columns and the .show() to
#show dataframe

+--------------------+-----+
|                name|total|
+--------------------+-----+
|                Name| null|
|           Bulbasaur|  318|
|             Ivysaur|  405|
|            Venusaur|  525|
|VenusaurMega Venu...|  625|
|          Charmander|  309|
|          Charmeleon|  405|
|           Charizard|  534|
|CharizardMega Cha...|  634|
|CharizardMega Cha...|  634|
|            Squirtle|  314|
|           Wartortle|  405|
|           Blastoise|  530|
|BlastoiseMega Bla...|  630|
|            Caterpie|  195|
|             Metapod|  205|
|          Butterfree|  395|
|              Weedle|  195|
|              Kakuna|  205|
|            Beedrill|  395|
+--------------------+-----+
only showing top 20 rows



Now that we understand how to apply some basic transformations on our dataset, we can begin to experiment with the vast ammount of functions that the Pyspark SQL module contains to extract some information on our dataset and perform some more transformations.

We will import the pyspark.sql.functions module which contains numerous functions that can be applied to DataFrames. We will replace all the columns where the type_1 column is equal to Bug to Stone using the .regexp_replace() method, as well as apply some aggregate functions and groupings. 

In [36]:
import pyspark.sql.functions as sql
df = df.withColumn('type_1', sql.regexp_replace('type_1', 'Bug', 'Stone').alias('type_1'))
groups = df.groupBy('type_1')
aggregate_data = groups.agg({'HP': 'mean'})
aggregate_data = aggregate_data.withColumnRenamed('avg(HP)', 'average_hp') #renaming column
aggregate_data.show() #returns a DataFrame with the average HP value per type_1 value

+--------+-----------------+
|  type_1|       average_hp|
+--------+-----------------+
|   Water|          72.0625|
|  Poison|            67.25|
|   Steel|65.22222222222223|
|    Rock|65.36363636363636|
|     Ice|             72.0|
|   Ghost|          64.4375|
|   Fairy|74.11764705882354|
| Psychic|70.63157894736842|
|  Dragon|          83.3125|
|  Flying|            70.75|
|Electric|59.79545454545455|
|    Fire|69.90384615384616|
|   Stone|56.88405797101449|
|  Type 1|             null|
|  Ground|         73.78125|
|    Dark|66.80645161290323|
|Fighting|69.85185185185185|
|   Grass|67.27142857142857|
|  Normal|77.27551020408163|
+--------+-----------------+



Finally, we will export the aggregate data our local machine for use. Spark offers methods for exporting to a CSV file, RDBMs using JDBC drivers, as well as a parquet file - which we will be doing using the .write.parquet() method

In [37]:
df.write.parquet('/home/tom/Documents/csv_files/pokemon_parquet.parquet') #takes a file path to save as

# Summary

In this notebook, we learnt how to load a CSV file into Spark using the .read.csv() method, how to specify a schema for our dataset, apply some basic transformations such as dropping NULL values and aggregate functions - and finally exporting our DataFrame as a parquet file to our local machine