In [None]:
import sys, os
import json
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

'''
Join the batting and salaries data for Barry Bonds per year.

The output should be the combined CSV string of batting and salaries data (one per year).

Use 'join' as the key for the final output in the reducer.

E.g:
"join"  "bondsba01,1986,1,PIT,NL,113,413,72,92,26,3,16,48,36,7,65,102,2,2,2,2,4,1986,PIT,NL,bondsba01,60000"

Schema:
Salaries: yearID	teamID	lgID	playerID	salary
Batting: playerID	yearID	stint	teamID	lgID	G	AB	R	H	2B	3B	HR	RBI	SB	CS	BB	SO

Hints: 
Use split to split the CSV lines (e.g. s = line.split(','))
Both files are sent to the mapper. Use the length of the lines to determine which is which.
'''
if __name__ == '__main__':
    # Create Spark context
    conf = SparkConf().setAppName("4_join").set("spark.streaming.concurrentJobs", "2").set('spark.hadoop.validateOutputSpecs', False)
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN")

    # Create Spark Streaming context
    ssc = StreamingContext(sparkContext=sc, batchDuration=1)

    # Defining the checkpoint directory
    ssc.checkpoint("/root/tmp")

    # Connect to Kafka and subscribe two topics using Direct Approach (No Receivers)
    # Please check: https://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html 
    # for more details about Direct Approch.
    kafkaStream = KafkaUtils.createDirectStream(ssc=ssc,
                                                kafkaParams={"metadata.broker.list": '<your ec2 instance public IP>:9092'},
                                                topics=['4_join_batting', '4_join_salaries'])
    
    '''
    Steps of joining samples from Salaries and Batting tables:
    1. filter samples by playerID: bondsba01
    2. determine whether a sample is from Salaries table or Batting table
    3. generate (key, value) pairs where yearID is the key

    ----- Once the above steps are done, we can start joining samples...
    
    What does updateStateByKey do?
    Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

    ---------------------------------------------------------------------
    4. Since the order of receiving Salaries and Batting samples are indeterministic, we need to use the updateStateByKey function to maintain and update state for each key(yearID).
    5. Apply a sort function to every RDD within the DStream, so that we can get the output sorted by yearID.
    '''

    # Filter playerID and distinguish salaries and batting samples
    def categorize(fields):
      """
      :param fields: each line is seperated to a list of fields by comma
      :return: a (key, value) tuple, where key is yearID/None, and value is Salaries/Batting/None
      """
      if len(fields) == 5 and fields[3] == 'bondsba01':  # find Salaries records for bondsba01
        return fields[0], ('S', ','.join(fields))
      elif fields[0] == 'bondsba01':  # find Batting records for bondsba01
        return fields[1], ('B', ','.join(fields))
      else:  # for other players, return None, None
        return None, None

    def updateState(new, old):
      """
      :param new: new state of a given key
      :param old: old state of a given key
      :return: a (key, value) tuple where key is yearID and value is a combined Salaries + Batting string.
      """
      tmp = None  # use a intermedia variable tmp to manage Salaries and Batting records
      if new and old:
        new.extend(old)
        tmp = new
      elif new:
        tmp = new
      elif old:
        tmp = old

      # Combine new and old only if they are both not None or empty list.
      if tmp and len(tmp) > 1 and type(tmp) is list:
        # @todo: determine the order of salaries and batting in the tmp array, then connect them in the order "salaries + batting"
      else:
        return tmp

    """    
    kafkaStream receives samples from kafka broker, then converts them to key-value pair RDD.
    Each line in the lambda statement of the map function is a tuple like below:
    (None, 'webbsk01,1945,1,DET,AL,118,118,407,43,81,12,2,0,21,8,7,30,35,,0,12,,9,118\n')
    (None, 'webbsk01,1946,1,DET,AL,64,64,169,12,37,1,1,0,17,3,3,9,18,,0,3,,5,64\n')
    In the initial state, Spark set the key to None. 
    Note this key is for RDD, it has nothing to do with kafka topics or anything else.

    Since the categorize function return (None, None) if the playerID is not bondsba01, 
    so the filter function preserve Salaries and Batting records for bondsba01 in RDD.
    It essentially groups tuples that satisfy the given condition as a list. 

    So the new and old parameters in the updateState function are list or None, like below:
    [('S', '1989,PIT,NL,bondsba01,360000')] None
    [('B', 'bondsba01,1990,1,PIT,NL,151,151,519,104,156,32,3,33,114,52,13,93,83,15,3,0,6,8,151')] [('S', '1990,PIT,NL, bondsba01,850000')]
    """
    kstream = kafkaStream.map(lambda line: categorize(line[1].strip().split(','))) \
                         .filter(lambda line: line[0]) \
                         .updateStateByKey(updateState) \
                        # @todo: sort RDD data by key(yearID) in ascending order using RDD transformation
    
    # kstream is a DStream after applying the above operations.
    # Then, save key-value pair RDD as a text file.
    # kstream.repartition(1).saveAsTextFiles('./4_join/output')
    kstream.foreachRDD(lambda rdd: rdd.repartition(1).saveAsTextFile('s3://vandy-bigdata-kzw/hw6/4_join.out'))


    # Start the streaming context
    ssc.start()
    ssc.awaitTerminationOrTimeout(120)
    ssc.stop()
