In [1]:
import os
import csv

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext

In [2]:
os.environ["HADOOP_USER_NAME"] = "hdfs"
os.environ["PYTHON_VERSION"] = "3.5.2"

# Read & Write with HDFS

In [3]:
sc = SparkContext()

In [4]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

In [5]:
# Read from hdfs
textFile = sc.textFile("hdfs://cluster/user/hdfs/test/results1.csv")
textFile.first()

'(GAVIN,9)'

In [None]:
# Write with hdfs
distData.saveAsTextFile("hdfs://cluster/user/hdfs/test/distData.csv")

# Read & Write with Hive

In [6]:
SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nn1:9083")
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

In [7]:
hive_context = HiveContext(spark)
hive_context.sql("show tables").show()

+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
| default|  earthquakes|      false|
| default|         test|      false|
| default| test_titanic|      false|
| default|train_titanic|      false|
+--------+-------------+-----------+



# Other tests

In [8]:
# spark is an existing SparkSession
df = spark.read.json("hdfs://cluster/user/hdfs/test/customers_example.json")
# Displays the content of the DataFrame to stdout
df.show()

+--------------------+----------+----------+
|             address|first_name| last_name|
+--------------------+----------+----------+
|[New Orleans,LA,6...|     James|Butterburg|
|[Brighton,MI,4 B ...| Josephine|   Darakjy|
|[Bridgeport,NJ,8 ...|       Art|    Chemel|
+--------------------+----------+----------+



In [9]:
# lien xml
df = spark.read.format("hdfs://cluster/user/hdfs/test/books.xml")

# Transformations and Actions
On Spark being lazy or not

In [10]:
# Read from hdfs
textFile = sc.textFile("hdfs://cluster/user/hdfs/test/bio-2008-2011.csv")
textFile.first()

'Annee,Region,id_departement,Departement,Distributeurs,Importateurs,Producteurs,Preparateurs'

In [11]:
# Action : count lines
textFile.count()

401

In [12]:
# transformation (lazy): read the file as csv
rdd = textFile.mapPartitions(lambda x: csv.reader(x))

In [13]:
# action : display first line
rdd.take(10)

[['Annee',
  'Region',
  'id_departement',
  'Departement',
  'Distributeurs',
  'Importateurs',
  'Producteurs',
  'Preparateurs'],
 ['2008', 'ALSACE', '67', 'BAS-RHIN', '38', 's', '152', '96'],
 ['2008', 'ALSACE', '68', 'HAUT-RHIN', '25', 's', '226', '76'],
 ['2008', 'AQUITAINE', '24', 'DORDOGNE', '15', 's', '257', '62'],
 ['2008', 'AQUITAINE', '33', 'GIRONDE', '41', 's', '261', '124'],
 ['2008', 'AQUITAINE', '40', 'LANDES', '15', 's', '108', '48'],
 ['2008', 'AQUITAINE', '47', 'LOT-ET-GARONNE', '20', '3', '299', '52'],
 ['2008', 'AQUITAINE', '64', 'PYRENEES-ATLANTIQUES', '17', '3', '199', '60'],
 ['2008', 'AUVERGNE', '3', 'ALLIER', '12', '', '129', '42'],
 ['2008', 'AUVERGNE', '15', 'CANTAL', '7', '', '77', '16']]

In [14]:
rdd.filter(lambda x : x[1] == 'ALSACE').take(10)

[['2008', 'ALSACE', '67', 'BAS-RHIN', '38', 's', '152', '96'],
 ['2008', 'ALSACE', '68', 'HAUT-RHIN', '25', 's', '226', '76'],
 ['2009', 'ALSACE', '67', 'BAS-RHIN', '53', 's', '183', '112'],
 ['2009', 'ALSACE', '68', 'HAUT-RHIN', '31', 's', '244', '85'],
 ['2010', 'ALSACE', '67', 'BAS-RHIN', '72', '3', '216', '107'],
 ['2010', 'ALSACE', '68', 'HAUT-RHIN', '40', 's', '272', '86'],
 ['2011', 'ALSACE', '67', 'BAS-RHIN', '73', 's', '236', '147'],
 ['2011', 'ALSACE', '68', 'HAUT-RHIN', '60', '', '300', '102']]

In [15]:
rddkey = rdd.map(lambda x: (x[0], x[1:7]))
rddkey.take(10)

[('Annee',
  ['Region',
   'id_departement',
   'Departement',
   'Distributeurs',
   'Importateurs',
   'Producteurs']),
 ('2008', ['ALSACE', '67', 'BAS-RHIN', '38', 's', '152']),
 ('2008', ['ALSACE', '68', 'HAUT-RHIN', '25', 's', '226']),
 ('2008', ['AQUITAINE', '24', 'DORDOGNE', '15', 's', '257']),
 ('2008', ['AQUITAINE', '33', 'GIRONDE', '41', 's', '261']),
 ('2008', ['AQUITAINE', '40', 'LANDES', '15', 's', '108']),
 ('2008', ['AQUITAINE', '47', 'LOT-ET-GARONNE', '20', '3', '299']),
 ('2008', ['AQUITAINE', '64', 'PYRENEES-ATLANTIQUES', '17', '3', '199']),
 ('2008', ['AUVERGNE', '3', 'ALLIER', '12', '', '129']),
 ('2008', ['AUVERGNE', '15', 'CANTAL', '7', '', '77'])]

In [16]:
rddkey.countByKey()

defaultdict(int,
            {'2008': 100, '2009': 100, '2010': 100, '2011': 100, 'Annee': 1})

# Wordcount example

In [17]:
# Read from hdfs 
jsonFile = sc.textFile("hdfs://cluster/user/hdfs/test/people.json")

In [18]:
wordcounts = jsonFile.map( lambda x: x.replace(':',' ').replace('{',' ').replace('}',' ').replace(',',' ') \
        .replace('"',' ').replace(',',' ').lower()) \
        .flatMap(lambda x: x.split()) \
        .map(lambda x: (x, 1)) \
        .reduceByKey(lambda x,y:x+y) \
        .map(lambda x:(x[1],x[0])) \
        .sortByKey(False) 

In [19]:
wordcounts.take(10)

[(3, 'name'),
 (2, 'age'),
 (1, 'michael'),
 (1, 'justin'),
 (1, 'andy'),
 (1, '19'),
 (1, '30')]

# Dictionnaries

In [21]:
### Rdd as dictionnary?
rdddict = jsonFile.map(lambda row : AttributeDict(json.loads(row)))