### Documentation
https://spark.apache.org/docs/latest/api/python/index.html

In [None]:
from pyspark import SparkContext
from pyspark.sql import HiveContext
sc = SparkContext()
sqlctx = HiveContext(sc)

In [None]:
sc, sqlctx

### Hadoop

File putted to hdfs by:

`hdfs dfs -put sample /user/mbrynski/`


In [None]:
file = sc.textFile('/user/mbrynski/sample')

In [None]:
file.first()

### File format
Vowpal rabbit validator:
http://hunch.net/~vw/validate.html

In [None]:
import re
def parse_line(line):
    return re.split('#', re.sub(' ([|\']([a-z] )?)+', '#', line)) #split to groups

In [None]:
rdd = file.map(parse_line)
rdd.first()

In [None]:
def parse_groups(line):
    try:
        out = {k:v for k,v in map(lambda e: e.split(':')[0:2], ' '.join(line[2:]).split(' '))} #split groups
        out['click'] = int(line[0])
        out['id'], out['timestamp'], _, _, out['ip'] = line[1].split(',')[0:5]
        return [out]
    except:
        return []

In [None]:
rdd = rdd.flatMap(parse_groups)
rdd.first()

In [None]:
sqlctx.createDataFrame(rdd).registerTempTable('data')

In [None]:
sqlctx.sql('select * from data where click > 0').count()

In [None]:
sqlctx.sql('select lang, count(*) count, sum(click) clicks from data group by lang order by count desc').show()

In [None]:
sqlctx.sql('select OperatingSystemFamilyName, count(*) count, sum(click) clicks from data group by OperatingSystemFamilyName order by count desc').show()

In [None]:
file.count()

In [None]:
sqlctx.sql('select * from data').count()

### Streaming

Put file to hdfs streaming dir

`hdfs dfs -put sample '/user/mbrynski/streaming/samplexxx'`

In [None]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5)

In [None]:
dstream = ssc.textFileStream('/user/mbrynski/streaming')

In [None]:
def processSQL(time, rdd):
    print("========= %s =========" % str(time))
    try:
        sqlctx.createDataFrame(rdd).registerTempTable('data')        
        sqlctx.sql('''
        select lang, count(*) count, sum(click) clicks 
        from data 
        group by lang 
        order by count desc
        ''').show()
    except Exception as e:
        print(e)


In [None]:
dstream.map(parse_line).flatMap(parse_groups).foreachRDD(processSQL)

In [None]:
ssc.start()