### Spark Initialization

In [46]:
import findspark
findspark.init()

import pyspark
# from pyspark.sql import *
import pyspark.sql.functions as func

spark = SparkSession.builder.getOrCreate()

### Load Data

##### CSV & JSON

In [33]:
pokemon_df = spark.read.csv("pokemon.csv", header=True)
combats_df = spark.read.csv("combats.csv", header=True)
# spark.read.json("data/Posts.json")

##### Parquet

In [None]:
posts_df = spark.read.load("posts.parquet")
answers_df.write.mode('overwrite').parquet("answers.parquet")

### Actions

In [23]:
pokemon_df.show(3)

+---+---------+-------+-------+---+------+-------+-------+-------+-----+---------+
|pid|     Name|Class 1|Class 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Legendary|
+---+---------+-------+-------+---+------+-------+-------+-------+-----+---------+
|  1|Bulbasaur|  Grass| Poison| 45|    49|     49|     65|     65|   45|    FALSE|
|  2|  Ivysaur|  Grass| Poison| 60|    62|     63|     80|     80|   60|    FALSE|
|  3| Venusaur|  Grass| Poison| 80|    82|     83|    100|    100|   80|    FALSE|
+---+---------+-------+-------+---+------+-------+-------+-------+-----+---------+
only showing top 3 rows



In [24]:
pokemon_df.count()

800

In [25]:
pokemon_df.describe()

DataFrame[summary: string, pid: string, Name: string, Class 1: string, Class 2: string, HP: string, Attack: string, Defense: string, Sp. Atk: string, Sp. Def: string, Speed: string, Legendary: string]

In [30]:
pokemon_df.printSchema()

root
 |-- pid: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Class 1: string (nullable = true)
 |-- Class 2: string (nullable = true)
 |-- HP: string (nullable = true)
 |-- Attack: string (nullable = true)
 |-- Defense: string (nullable = true)
 |-- Sp. Atk: string (nullable = true)
 |-- Sp. Def: string (nullable = true)
 |-- Speed: string (nullable = true)
 |-- Legendary: string (nullable = true)



### Maps

In [89]:
# Casting
pokemon_df = pokemon_df.withColumn('Speed', pokemon_df['Speed'].cast("float"))

# Combine columns
_ = pokemon_df.withColumn('Atk+Def', pokemon_df['Attack'] + pokemon_df['Defense'])

# Filter rows
_ = pokemon_df.filter('Attack == 49')

# Filter Columns
_ = pokemon_df.select('pid', 'Attack', 'Defense')

# Filter duplicates
_ = pokemon_df.distinct()

# Join two dataframes
_ = combats_df.join(pokemon_df, combats_df.Winner == pokemon_df.pid)

# Group by
_ = pokemon_df.groupBy("Class 1").agg(func.mean("Attack").alias("Mean Attack"))

# Subsample
_ = pokemon_df.sample(fraction=0.1)

DataFrame[pid: string, Name: string, Class 1: string, Class 2: string, HP: string, Attack: string, Defense: string, Sp. Atk: string, Sp. Def: string, Speed: string, Legendary: string]

In [83]:
pokemon_df.show()

+---+----------------+-------+-------+---+------+-------+-------+-------+-----+---------+
|pid|            Name|Class 1|Class 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Legendary|
+---+----------------+-------+-------+---+------+-------+-------+-------+-----+---------+
|  1|       Bulbasaur|  Grass| Poison| 45|    49|     49|     65|     65|   45|    FALSE|
|  2|         Ivysaur|  Grass| Poison| 60|    62|     63|     80|     80|   60|    FALSE|
|  3|        Venusaur|  Grass| Poison| 80|    82|     83|    100|    100|   80|    FALSE|
|  4|   Mega Venusaur|  Grass| Poison| 80|   100|    123|    122|    120|   80|    FALSE|
|  5|      Charmander|   Fire|   null| 39|    52|     43|     60|     50|   65|    FALSE|
|  6|      Charmeleon|   Fire|   null| 58|    64|     58|     80|     65|   80|    FALSE|
|  7|       Charizard|   Fire| Flying| 78|    84|     78|    109|     85|  100|    FALSE|
|  8|Mega Charizard X|   Fire| Dragon| 78|   130|    111|    130|     85|  100|    FALSE|
|  9|Mega 

### Other Interactions

##### Spark SQL

In [95]:
pokemon_df.registerTempTable('pokemon')


spark.sql("""
SELECT Name, Speed
FROM pokemon
WHERE Name LIKE "%Deoxys%"
ORDER BY Speed DESC
""").show()

+--------------------+-----+
|                Name|Speed|
+--------------------+-----+
|  Deoxys Speed Forme|180.0|
| Deoxys Normal Forme|150.0|
|  DeoxysAttack Forme|150.0|
|Deoxys Defense Forme| 90.0|
+--------------------+-----+



##### RDD's

In [127]:
pokemon_df.select('Speed').rdd.map(lambda row: [row[0], row['Speed']*2]).toDF(['Speed', '2 * Speed']).show(2)

+-----+---------+
|Speed|2 * Speed|
+-----+---------+
| 45.0|     90.0|
| 60.0|    120.0|
+-----+---------+
only showing top 2 rows



##### Pandas

In [129]:
pokemon_df.toPandas().head(2)

Unnamed: 0,pid,Name,Class 1,Class 2,HP,Attack,Defense,Sp. Atk,Sp. Def,Speed,Legendary
0,1,Bulbasaur,Grass,Poison,45,49,49,65,65,45.0,False
1,2,Ivysaur,Grass,Poison,60,62,63,80,80,60.0,False
