In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
import os
import csv
from io import StringIO
import json
import wget

In [2]:
conf = SparkConf().setMaster("local").setAppName("tutorial")

## Basic RDD transformation

In [22]:
sc = SparkContext(conf=conf)
lines = sc.textFile('README.md')
nums = sc.parallelize([1, 2, 3, 3])
items = ['lines','nums','map','flatMap','filter','distinct','sample']
map = lines.map(lambda x: (x.split(" ")[0], x))
flatMap = nums.flatMap(lambda x: list(range(x, 4)))
filter = lines.filter(lambda line: "Python" in line)
distinct = nums.distinct()
sample = nums.sample(False, 0.5)
for item in items:
    print(f"\n{item}:{eval(item).take(10)}")
sc.stop()


lines:['# Apache Spark', '', 'Spark is a unified analytics engine for large-scale data processing. It provides', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 'supports general computation graphs for data analysis. It also supports a', 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', 'MLlib for machine learning, GraphX for graph processing,', 'and Structured Streaming for stream processing.', '', '<https://spark.apache.org/>']

nums:[1, 2, 3, 3]

map:[('#', '# Apache Spark'), ('', ''), ('Spark', 'Spark is a unified analytics engine for large-scale data processing. It provides'), ('high-level', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'), ('supports', 'supports general computation graphs for data analysis. It also supports a'), ('rich', 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,'), ('MLlib', 'MLlib for machine learning, GraphX for graph processing,'), ('and'

## Two-RDD Transformation

In [23]:
sc = SparkContext(conf=conf)
nums = sc.parallelize([1, 2, 3, 3])
others = sc.parallelize([3, 3, 6, 7])
items = ['nums','others','union','intersection','subtract','cartesian']
union = nums.union(others)
intersection = nums.intersection(others)
subtract = nums.subtract(others)
cartesian = nums.cartesian(others)
for item in items:
    print(f"\n{item}:{eval(item).take(10)}")
sc.stop()


nums:[1, 2, 3, 3]

others:[3, 3, 6, 7]

union:[1, 2, 3, 3, 3, 3, 6, 7]

intersection:[3]

subtract:[2, 1]

cartesian:[(1, 3), (1, 3), (1, 6), (1, 7), (2, 3), (2, 3), (2, 6), (2, 7), (3, 3), (3, 3)]


In [20]:
## Basic action on an RDD

In [24]:
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 3])
items =['rdd','collect','count','countByValue','take','top','takeOrdered','takeSample',
        'reduce','fold','aggregate','foreach']
collect = rdd.collect()
count = rdd.count()
countByValue = rdd.countByValue().items()
take = rdd.take(2)
top = rdd.top(2)
takeOrdered = rdd.takeOrdered(2)
takeSample = rdd.takeSample(False, 1)
reduce = rdd.reduce(lambda x, y: x + y)
fold = rdd.fold(0, lambda x, y: x + y)
aggregate = rdd.aggregate((0, 0),
                          (lambda acc, value: (acc[0] + value, acc[1] + 1)),
                          (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
                          )
def squared(x):
    print(x)
foreach = rdd.foreach(squared)
for item in items:
    print(f"\n{item}:{eval(item)}")
sc.stop()


rdd:ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

collect:[1, 2, 3, 3]

count:4

countByValue:dict_items([(1, 1), (2, 1), (3, 2)])

take:[1, 2]

top:[3, 3]

takeOrdered:[1, 2]

takeSample:[1]

reduce:9

fold:9

aggregate:(9, 4)

foreach:None


## Transformation on one pair RDD

In [25]:
sc = SparkContext(conf=conf)
pairs = sc.parallelize([(1, 2), (3, 4), (3, 6)])
items = ['pairs','reduceByKey', 'groupByKey', 'combineByKey',
         'mapValues', 'flatMapValues', 'keys', 'values', 'sortByKey']
reduceByKey = pairs.reduceByKey(lambda x, y: x + y)
groupByKey = pairs.groupByKey().map(lambda x: (x[0], list(x[1])))
sumCount = pairs.combineByKey((lambda x: (x, 1)),
                                  (lambda x, y: (x[0] + y, x[1] + 1)),
                                  (lambda x, y: (x[0] + y[0], x[1] + y[1])))
combineByKey = sumCount.map(lambda key_xy: (key_xy[0], key_xy[1][0]/key_xy[1][1]))
mapValues = pairs.mapValues(lambda x: x + 1)
flatMapValues = pairs.flatMapValues(lambda x: list(range(x, 6)))
keys = pairs.keys()
values = pairs.values()
sortByKey = pairs.sortByKey()
for item in items:
    print(f"\n{item}:{eval(item).collect()}")
sc.stop()


pairs:[(1, 2), (3, 4), (3, 6)]

reduceByKey:[(1, 2), (3, 10)]

groupByKey:[(1, [2]), (3, [4, 6])]

combineByKey:[(1, 2.0), (3, 5.0)]

mapValues:[(1, 3), (3, 5), (3, 7)]

flatMapValues:[(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)]

keys:[1, 3, 3]

values:[2, 4, 6]

sortByKey:[(1, 2), (3, 4), (3, 6)]


## Transformations on two pair RDDs

In [26]:
sc = SparkContext(conf=conf)
pair1 = sc.parallelize([(1,2),(3,4),(3,6)])
pair2 = sc.parallelize([(3,9)])
items = ['pair1', 'pair2', 'subtractByKey', 'join',
         'rightOuterJoin', 'leftOuterJoin', 'cogroup']
subtractByKey = pair1.subtractByKey(pair2)
join = pair1.join(pair2)
rightOuterJoin = pair1.rightOuterJoin(pair2)
leftOuterJoin = pair1.leftOuterJoin(pair2)
cogroup = pair1.cogroup(pair2).map(lambda x: (x[0], (list(x[1][0]), list(x[1][1]))))
for item in items:
    print(f"\n{item}:{eval(item).collect()}")
sc.stop()


pair1:[(1, 2), (3, 4), (3, 6)]

pair2:[(3, 9)]

subtractByKey:[(1, 2)]

join:[(3, (4, 9)), (3, (6, 9))]

rightOuterJoin:[(3, (4, 9)), (3, (6, 9))]

leftOuterJoin:[(1, (2, None)), (3, (4, 9)), (3, (6, 9))]

cogroup:[(1, ([2], [])), (3, ([4, 6], [9]))]


## Actions on pair RDDs

In [27]:
sc = SparkContext(conf=conf)
pairs = sc.parallelize([(1, 2), (3, 4), (3, 6)])
items = ['pairs','countByKey', 'collectAsMap','lookup']
countByKey = pairs.countByKey().items()
collectAsMap = pairs.collectAsMap()
lookup = pairs.lookup(3)
for item in items:
    print(f"\n{item}:{eval(item)}")
sc.stop()


pairs:ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

countByKey:dict_items([(1, 1), (3, 2)])

collectAsMap:{1: 2, 3: 6}

lookup:[4, 6]


## Load dataset

In [28]:
sc = SparkContext(conf=conf)
input = sc.textFile('input/pandas.txt')
data = input.map(lambda x: json.loads(x))
if not os.path.isdir('output/lovespandas'):
    data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x)).saveAsTextFile('output/lovespandas')

def loadRecord(line):
    input = StringIO(line)
    reader = csv.DictReader(input, fieldnames=['name','favoriteAnimal'])
    return next(reader)
input = sc.textFile('input/csv.txt').map(loadRecord)
print(f'csv.txt:{input.collect()}')
def loadRecords(fileNameContents):
    input = StringIO(fileNameContents[1])
    reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
    return reader
fullFileData = sc.wholeTextFiles('input/csv.txt').flatMap(loadRecords)
print(f'csv.txt:{fullFileData.collect()}')

def writeRecords(records):
    output = StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]

if not os.path.isdir('output/csv'):
    fullFileData.mapPartitions(writeRecords).saveAsTextFile('output/csv')
sc.stop()

csv.txt:[{'name': '1 Kiyeon', 'favoriteAnimal': None}, {'name': '2 Chungyeol', 'favoriteAnimal': None}]
csv.txt:[{'name': '1 Kiyeon', 'favoriteAnimal': None}, {'name': '2 Chungyeol', 'favoriteAnimal': None}]


## Create sql table with txt file

In [29]:
sc = SparkContext(conf=conf)
url = 'https://raw.githubusercontent.com/apache/spark/master/examples/src/main/resources/kv1.txt'
inputFile = 'input/kv1.txt'
inputTable = 'myTable'
if not os.path.isfile(inputFile):
    wget.download(url, inputFile)
hiveCtx = HiveContext(sc)
if not os.path.isdir(f'spark-warehouse/{inputTable}'):
    hiveCtx.sql(f"CREATE TABLE IF NOT EXISTS {inputTable} (key INT, value STRING)")
    hiveCtx.sql(f"LOAD DATA LOCAL INPATH '{inputFile}' INTO TABLE myTable")
input = hiveCtx.sql(f"FROM {inputTable} SELECT key, value")
print(f'\nmyTable query: {input.rdd.take(10)}')
print(f'\nmyTable key: {input.rdd.map(lambda row: row[0]).take(10)}')
sc.stop()


myTable query: [Row(key=238, value='val_238'), Row(key=86, value='val_86'), Row(key=311, value='val_311'), Row(key=27, value='val_27'), Row(key=165, value='val_165'), Row(key=409, value='val_409'), Row(key=255, value='val_255'), Row(key=278, value='val_278'), Row(key=98, value='val_98'), Row(key=484, value='val_484')]

myTable key: [238, 86, 311, 27, 165, 409, 255, 278, 98, 484]


## Read json file into table

In [30]:
sc = SparkContext(conf=conf)
hiveCtx = HiveContext(sc)
tweets = hiveCtx.read.json('input/tweets.json')
tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name, text FROM tweets")
resultsText = results.rdd.map(lambda row: row.text)
print(f'\ntweets query: {results.collect()}')
print(f'\ntweets text: {resultsText.collect()}')
sc.stop()


tweets query: [Row(name=None, text=None), Row(name='Holden', text='Nice day out today'), Row(name='Matei', text='Ever nicer here :)'), Row(name=None, text=None)]

tweets text: [None, 'Nice day out today', 'Ever nicer here :)', None]
