<a href="https://colab.research.google.com/github/waltz2u/bd/blob/master/Basic_Spark_RDD_MorningStar_Rating_and_Stocks_DataProc.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Basic Spark RDD with MorningStar Rating and Stock data¶

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate() #

## MorningStar Ratings

In [0]:
lines = spark.sparkContext.textFile('gs://us-demo/morningstar.csv')
lines.take(3)

[u'ProFunds Short Precious Metals Inv,SPPIX,NA',
 u'ProFunds Short Precious Metals Svc,SPPSX,NA',
 u'ProFunds Biotechnology UltraSector Inv,BIPIX,4']

In [0]:
ratings = lines.map(lambda x: x.split(',')[2])
ratings.take(10)

[u'NA', u'NA', u'4', u'3', u'5', u'5', u'4', u'4', u'4', u'4']

In [0]:
result = ratings.countByValue()
print(result)

defaultdict(<type 'int'>, {u'1': 24, u'NA': 19, u'3': 19, u'5': 18, u'4': 19})


In [0]:
import collections
sortedCounts = collections.OrderedDict(sorted(result.items()))
for key, value in sortedCounts.items():
    print ("%s %i" % (key, value))

1 24
3 19
4 19
5 18
NA 19


## Mean Price by Month¶

Given date and stock price, returns average price by month

In [0]:
# this function gets month, year and adjusted closed price
def parseLine(line):
    fields = line.split(',')
    datetimedata = fields[0]
    datetimearray = datetimedata.split('/')
    monthyear = datetimearray[0]+datetimearray[2]
    adjclose = float(fields[6])
    return (monthyear, adjclose)

lines = spark.sparkContext.textFile("gs://us-demo/adjclose.csv")

# lines is an RDD, take some to check
lines.take(3)

[u'1/28/2015,49.799999,50.32,46.299999,46.459999,84839700,46.459999',
 u'1/27/2015,49.139999,49.279999,47.66,47.990002,45777200,47.990002',
 u'1/26/2015,49.57,49.790001,49.07,49.439999,18976400,49.439999']

In [0]:
# gets month, year and adjusted closed price
rdd = lines.map(parseLine)
rdd.take(3)

[(u'12015', 46.459999), (u'12015', 47.990002), (u'12015', 49.439999)]

In [0]:
# mapValues to gets a tuple of (adjusted price, 1) for each month
rdd.mapValues(lambda x: (x, 1)).take(3)

[(u'12015', (46.459999, 1)),
 (u'12015', (47.990002, 1)),
 (u'12015', (49.439999, 1))]

In [0]:
# then add tuples together using reduceByKey
priceByMonthYear = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
priceByMonthYear.take(3)

[(u'102009', (372.749997, 22)),
 (u'42014', (735.050005, 21)),
 (u'32013', (454.3200039999999, 20))]

In [0]:
# then finally mapValues to calculate average price 
averagesByMonthYear = priceByMonthYear.mapValues(lambda x: x[0] / x[1])
results = averagesByMonthYear.collect()
for result in results:
    print (result)

(u'102009', 16.94318168181818)
(u'42014', 35.00238119047619)
(u'32013', 22.716000199999996)
(u'32011', 16.660869608695652)
(u'82010', 13.796363636363639)
(u'82012', 15.226086956521744)
(u'22012', 15.371500000000003)
(u'22010', 15.268947368421053)
(u'22014', 37.43578963157895)
(u'12015', 48.438888722222224)
(u'42012', 15.253499999999997)
(u'62009', 15.957272772727272)
(u'12013', 19.815714523809525)
(u'52011', 16.98523780952381)
(u'12011', 16.42199995)
(u'92011', 14.10904761904762)
(u'92013', 30.171500100000003)
(u'72013', 27.304545590909093)
(u'112013', 34.82449995)
(u'72011', 14.505)
(u'102010', 15.444761809523808)
(u'102012', 16.12428576190476)
(u'102014', 41.39391286956522)
(u'112011', 15.508571428571432)
(u'52013', 26.159545227272723)
(u'82009', 14.75095238095238)
(u'122014', 50.34045431818181)
(u'122012', 19.413000049999997)
(u'122010', 16.585909136363632)
(u'82014', 36.87619052380952)
(u'62014', 34.958571333333325)
(u'62012', 15.420476190476192)
(u'62010', 15.030454545454544)
(u'4