# 0. RDD, DataFrame, Dataset

* RDD = Resilient Distributed Datasets
* RDDs can be significantly slower on Python compared with Java or Scala. DataFrames are not.
* Dataset is type-specific so Python is not supported.
* In Spark 2.0, DataFrame is Dataset Untyped API. (DataFrame = Dataset[Row])

Tungsten is the project name to develop/improve the core engine of Spark

# 1. RDD (schema-less ^_^)

In [1]:
import pyspark

sc = pyspark.SparkContext(appName='^_^')

In [2]:
# Create with python
data = sc.parallelize([('tea', 777), ('coffee', 333), ('diet coke', 111), ('tea', 222)])
print(data.take(1))

# Read from a file. Can I use a zip file? It didn't work...
data_from_file = sc.textFile('rockers.txt')
parsed_data_from_file = data_from_file.map(lambda row: [x.strip() for x in row.split(',')])
print(parsed_data_from_file.takeSample(False, 2, 0))

[('tea', 777)]
[['Michael Jackson', '8', '29', '1958'], ['David Bowie', '1', '8', '1947']]


Parsing examples:
https://github.com/drabastomek/learningPySpark/blob/master/Chapter02/LearningPySpark_Chapter02.ipynb

In [3]:
# Look at distinct values of a column
rockers_born_years = parsed_data_from_file.map(lambda row: row[3]).distinct()
rockers_born_years.collect()

['1940', '1947', '1943', '1942', '1958']

In [4]:
# Reducing! Needs to be associative and commutative
data_sum = data.map(lambda row: row[1]).reduce(lambda x, y: x + y)
print(data_sum)

data_sum_alt = data.reduceByKey(lambda x, y: x + y)
print(data_sum_alt.collect())

1443
[('tea', 999), ('diet coke', 111), ('coffee', 333)]


In [5]:
# Counting
print(data.count())
print(data.countByKey().items())

4
dict_items([('diet coke', 1), ('tea', 2), ('coffee', 1)])


In [6]:
sc.stop()

## Questions:
* Can I change the key col to something other than the first column?

# 2. DataFrame (with schema!)
* Let spark be pyspark.sql.SparkSession: https://spark.apache.org/docs/preview/api/python/pyspark.sql.html#pyspark.sql.SparkSession
* Read from csv: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader

In [7]:
import pyspark

sc = pyspark.SparkContext(appName='^_^')
spark = pyspark.sql.SparkSession(sc)

In [8]:
# Read from csv file by specifying each column's data type
import pyspark.sql.types as typ

schema = typ.StructType([typ.StructField('name', typ.StringType()), 
                         typ.StructField('born_month', typ.IntegerType()), 
                         typ.StructField('born_day', typ.IntegerType()), 
                         typ.StructField('born_year', typ.IntegerType())])

df = spark.read.csv('rockers.csv', schema, header=True,
                    ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)

df.show()
df.printSchema()

+---------------+----------+--------+---------+
|           name|born_month|born_day|born_year|
+---------------+----------+--------+---------+
|   Jimi Hendrix|        11|      27|     1942|
|   Jim Morrison|        12|       8|     1943|
|    John Lennon|        10|       9|     1940|
|   Janis Joplin|         1|      19|     1943|
|    Mick Jagger|         7|      26|     1943|
|    David Bowie|         1|       8|     1947|
|     Elton John|         3|      25|     1947|
|Michael Jackson|         8|      29|     1958|
|         Prince|         6|       7|     1958|
+---------------+----------+--------+---------+

root
 |-- name: string (nullable = true)
 |-- born_month: integer (nullable = true)
 |-- born_day: integer (nullable = true)
 |-- born_year: integer (nullable = true)



In [9]:
# Read from json-format rdd
json_str_1 = '{"id": "0", "name": "Zelda", "color": "blue", "birth month": 12, "score": 99}'
json_str_2 = '{"id": "11", "name": "Tomato", "color": "green", "birth month": 8, "score": 31}'
json_str_3 = '{"id": "935", "name": "Katy", "color": "pink", "birth month": 3, "score": 79}'
json_str_4 = '{"id": "1000", "name": "Tree", "color": "green", "birth month": 1, "score": 88}'
json_str_5 = '{"id": "1001", "name": "Kitty", "color": "pink", "birth month": 9, "score": ""}'
json_list = [json_str_1, json_str_2, json_str_3, json_str_4, json_str_5]

rdd = sc.parallelize(json_list)
df = spark.read.json(rdd)

df.show()
df.printSchema()

+-----------+-----+----+------+-----+
|birth month|color|  id|  name|score|
+-----------+-----+----+------+-----+
|         12| blue|   0| Zelda|   99|
|          8|green|  11|Tomato|   31|
|          3| pink| 935|  Katy|   79|
|          1|green|1000|  Tree|   88|
|          9| pink|1001| Kitty| null|
+-----------+-----+----+------+-----+

root
 |-- birth month: long (nullable = true)
 |-- color: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- score: long (nullable = true)



In [10]:
# Make a temp table from DataFrame
df.createOrReplaceTempView('df')

# SQL query on the temp table!
# Avoid .collect() because it will display all rows. Use .show(n) or .take(n)
spark.sql('select * from df where `birth month` > 3').show(2)

# Can also do querying with DataFrame API
cols = ['name', 'birth month', 'color']
df.select(*cols).filter('`birth month` > 3 and color = "green"').show()

+-----------+-----+---+------+-----+
|birth month|color| id|  name|score|
+-----------+-----+---+------+-----+
|         12| blue|  0| Zelda|   99|
|          8|green| 11|Tomato|   31|
+-----------+-----+---+------+-----+
only showing top 2 rows

+------+-----------+-----+
|  name|birth month|color|
+------+-----------+-----+
|Tomato|          8|green|
+------+-----------+-----+



In [11]:
# Count # of distinct values in each column
from pyspark.sql.functions import countDistinct

df_agg = df.agg(*(countDistinct(col).alias(col + ' count') for col in df.columns))
df_agg.show()

+-----------------+-----------+--------+----------+-----------+
|birth month count|color count|id count|name count|score count|
+-----------------+-----------+--------+----------+-----------+
|                5|          3|       5|         5|          4|
+-----------------+-----------+--------+----------+-----------+



In [12]:
# Add new id
from pyspark.sql.functions import monotonically_increasing_id

df_with_new_id = df.withColumn('new_id', monotonically_increasing_id())
df_with_new_id.show()

+-----------+-----+----+------+-----+-----------+
|birth month|color|  id|  name|score|     new_id|
+-----------+-----+----+------+-----+-----------+
|         12| blue|   0| Zelda|   99|          0|
|          8|green|  11|Tomato|   31| 8589934592|
|          3| pink| 935|  Katy|   79|17179869184|
|          1|green|1000|  Tree|   88|25769803776|
|          9| pink|1001| Kitty| null|25769803777|
+-----------+-----+----+------+-----+-----------+



In [13]:
# Basic stats for numerical values
df.describe(['birth month', 'score']).show()

+-------+-----------------+------------------+
|summary|      birth month|             score|
+-------+-----------------+------------------+
|  count|                5|                 4|
|   mean|              6.6|             74.25|
| stddev|4.505552130427524|29.970819141291415|
|    min|                1|                31|
|    max|               12|                99|
+-------+-----------------+------------------+



In [14]:
sc.stop()

# 3. Machine Learning
* MLLib uses RDDs, but to be depreciated.
* * Ex: https://github.com/drabastomek/learningPySpark/blob/master/Chapter05/LearningPySpark_Chapter05.ipynb
* * Docu: http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html
* ML uses DataFrames, but some things are still experimental.
* * Ex:https://github.com/drabastomek/learningPySpark/blob/master/Chapter06/LearningPySpark_Chapter06.ipynb
* * Docu: http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html