In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .master("local") \
    .appName('jupyter-pyspark') \
        .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
        .enableHiveSupport()\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

21/12/08 18:16:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# HDFS

http://localhost:50070/  (Outside the docker environment)

In [3]:
# Read local
df = spark.read.csv("/home/jovyan/datasets/customers/customers.csv", header=True)
df.sample(0.2).toPandas()

Unnamed: 0,First,Last,Email,Gender,Last IP Address,City,State,Total Orders,Total Purchased,Months Customer
0,Barb,Barion,bbarion@superrito.com,F,38.68.15.223,Dallas,TX,4,1590,1
1,Barry,DeHatchett,bdehatchett@dayrep.com,M,23.192.215.78,Boston,MA,1,15,6
2,Bill,Melator,bmelator@einrot.com,M,24.11.125.10,Orem,UT,9,6090,35
3,Candi,Cayne,ccayne@rhyta.com,F,24.39.14.15,Portland,ME,1,620,2
4,Justin,Case,jcase@dayrep.com,M,23.192.215.44,Boston,MA,3,1050,1
5,Lee,Hvmeehom,lhvmeehom@einrot.com,F,215.82.23.2,Columbus,OH,9,207,18
6,Rowan,Deboat,rdeboat@dayrep.com,M,23.84.32.22,Topeka,KS,1,3500,42
7,Ray,Ovlight,rovlight@dayrep.com,M,74.111.18.59,Syracuse,NY,6,125,42
8,Tim,Pani,tpani@superrito.com,M,23.84.132.226,Buffalo,NY,0,0,1


In [4]:
# Write to HDFS
df.write.csv("webhdfs://namenode:50070/user/demo/customers/")

                                                                                

In [5]:
# Read back from HDFS
spark.read.csv("webhdfs://namenode:50070/user/demo/customers/", header=False).show()

+------+----------+--------------------+---+---------------+-----------+---+---+----+---+
|   _c0|       _c1|                 _c2|_c3|            _c4|        _c5|_c6|_c7| _c8|_c9|
+------+----------+--------------------+---+---------------+-----------+---+---+----+---+
|    Al|    Fresco|  afresco@dayrep.com|  M|  74.111.18.161|   Syracuse| NY|  1|  45|  1|
|  Abby|      Kuss|     akuss@rhyta.com|  F|  23.80.125.101|    Phoenix| AZ|  1|  25|  2|
| Arial|     Photo|   aphoto@dayrep.com|  F|     24.0.14.56|     Newark| NJ|  1| 680|  1|
| Bette|     Alott|    balott@rhyta.com|  F| 56.216.127.219|    Raleigh| NC|  6| 560| 18|
|  Barb|    Barion|bbarion@superrito...|  F|   38.68.15.223|     Dallas| TX|  4|1590|  1|
| Barry|DeHatchett|bdehatchett@dayre...|  M|  23.192.215.78|     Boston| MA|  1|  15|  6|
|  Bill|   Melator| bmelator@einrot.com|  M|   24.11.125.10|       Orem| UT|  9|6090| 35|
| Candi|     Cayne|    ccayne@rhyta.com|  F|    24.39.14.15|   Portland| ME|  1| 620|  2|
| Carol|  

In [8]:
# Upload to HDFS
df = spark.read.options(inferSchema=True,delimiter='\t').csv("/home/jovyan/datasets/grades/*.tsv", header=False)
df.write.options(inferSchema=True,delimiter='\t').csv("webhdfs://namenode:50070/user/demo/grades/")

### HDFS API

You can maniuplate HDFS with a REST API:  
https://hadoop.apache.org/docs/r2.7.4/hadoop-project-dist/hadoop-hdfs/WebHDFS.html

In [22]:
# list folders in /user
!curl -i  "http://namenode:50070/webhdfs/v1/user?op=LISTSTATUS"

HTTP/1.1 200 OK
[1mCache-Control[0m: no-cache
[1mExpires[0m: Wed, 08 Dec 2021 18:28:58 GMT
[1mDate[0m: Wed, 08 Dec 2021 18:28:58 GMT
[1mPragma[0m: no-cache
[1mExpires[0m: Wed, 08 Dec 2021 18:28:58 GMT
[1mDate[0m: Wed, 08 Dec 2021 18:28:58 GMT
[1mPragma[0m: no-cache
[1mContent-Type[0m: application/json
[1mTransfer-Encoding[0m: chunked
[1mServer[0m: Jetty(6.1.26)

{"FileStatuses":{"FileStatus":[
{"accessTime":0,"blockSize":0,"childrenNum":2,"fileId":16394,"group":"supergroup","length":0,"modificationTime":1638987712433,"owner":"jovyan","pathSuffix":"demo","permission":"755","replication":0,"storagePolicy":0,"type":"DIRECTORY"},
{"accessTime":0,"blockSize":0,"childrenNum":1,"fileId":16388,"group":"supergroup","length":0,"modificationTime":1638987350742,"owner":"root","pathSuffix":"hive","permission":"755","replication":0,"storagePolicy":0,"type":"DIRECTORY"}
]}}


## Python HDFS Module

A Python (not spark) module for interacting with the HDFS filesystem

https://hdfscli.readthedocs.io/en/latest/quickstart.html

In [27]:
!pip install -q hdfs

In [31]:
from hdfs import InsecureClient
client = InsecureClient('http://namenode:50070', user='root')
client.content("/user/demo/grades/"), client.list("/user/demo/grades/")

({'directoryCount': 3,
  'fileCount': 2,
  'length': 404,
  'quota': -1,
  'spaceConsumed': 1212,
  'spaceQuota': -1},
 ['*.csv',
  '*.tsv',
  '_SUCCESS',
  'part-00000-870587d4-c231-44d9-a4cb-03a4d2f4a476-c000.csv'])

# Hive

In [9]:
spark.sql("show tables;").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



## Internal Tables

Internal tables are stored in `/user/hive/warehouse`

In [10]:
# read Local Data
df = spark.read.option("multiline","true").json("/home/jovyan/datasets/json-samples/stocks.json")
df.toPandas()

Unnamed: 0,price,symbol
0,126.82,AAPL
1,3098.12,AMZN
2,251.11,FB
3,1725.05,GOOG
4,128.39,IBM
5,212.55,MSFT
6,78.0,NET
7,497.0,NFLX
8,823.8,TSLA
9,45.11,TWTR


In [11]:
# Hive Internal Table Bulk import

spark.sql("DROP TABLE IF EXISTS default.stocks;")
df.createOrReplaceTempView("tmp_stocks") 
spark.sql("""
CREATE TABLE IF NOT EXISTS default.stocks 
    AS select * from tmp_stocks
""")
spark.sql("select * from default.stocks").show()

+-------+------+
|  price|symbol|
+-------+------+
| 126.82|  AAPL|
|3098.12|  AMZN|
| 251.11|    FB|
|1725.05|  GOOG|
| 128.39|   IBM|
| 212.55|  MSFT|
|   78.0|   NET|
|  497.0|  NFLX|
|  823.8|  TSLA|
|  45.11|  TWTR|
+-------+------+



In [12]:
# Hive Internal Table create stuff.

spark.sql("""drop table if exists default.department""")
spark.sql("""CREATE TABLE default.department(
department_id int ,
department_name string
)    
""")
spark.sql("""
INSERT INTO default.department values (101,"Oncology")    
""")
spark.sql("""
INSERT INTO default.department values (102,"Hematology")    
""")
spark.sql("SELECT * FROM default.department").show()

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|          101|       Oncology|
|          102|     Hematology|
+-------------+---------------+



## External Tables

External tables exist in the metastore only and point to an HDFS loocation

In [13]:
# Create a database
spark.sql("CREATE DATABASE IF NOT EXISTS ischool")
spark.sql("show databases;").show()

+---------+
|namespace|
+---------+
|  default|
|  ischool|
+---------+



In [16]:
# external table
spark.sql("drop table if exists ischool.grades")
spark.sql("""
create external table ischool.grades (
  year int,
  semester string,
  course string,
  credits int,
  grade string
) 
row format delimited 
fields terminated by '\t' 
location  'hdfs:///user/demo/grades/*.csv'
""")
spark.sql("select * from ischool.grades").show()

+----+--------+------+-------+-----+
|year|semester|course|credits|grade|
+----+--------+------+-------+-----+
|2016|    Fall|IST346|      3|    A|
|2016|    Fall|CHE111|      4|   A-|
|2016|    Fall|PSY120|      3|   B+|
|2016|    Fall|IST256|      3|    A|
|2016|    Fall|ENG121|      3|   B+|
|2015|    Fall|IST101|      1|    A|
|2015|    Fall|IST195|      3|    A|
|2015|    Fall|IST233|      3|   B+|
|2015|    Fall|SOC101|      3|   A-|
|2015|    Fall|MAT221|      3|    C|
|2016|  Spring|GEO110|      3|   B+|
|2016|  Spring|MAT222|      3|    A|
|2016|  Spring|SOC121|      3|   C+|
|2016|  Spring|BIO240|      3|   B-|
|2017|  Spring|IST462|      3|    A|
|2017|  Spring|MAT411|      3|    C|
|2017|  Spring|SOC422|      3|   B-|
|2017|  Spring|ENV201|      3|   A-|
+----+--------+------+-------+-----+



In [17]:
spark.sql("SELECT * FROM default.department").show()

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|          101|       Oncology|
|          102|     Hematology|
+-------------+---------------+



In [18]:
spark.sql("select * from ischool.grades").show()

+----+--------+------+-------+-----+
|year|semester|course|credits|grade|
+----+--------+------+-------+-----+
|2016|    Fall|IST346|      3|    A|
|2016|    Fall|CHE111|      4|   A-|
|2016|    Fall|PSY120|      3|   B+|
|2016|    Fall|IST256|      3|    A|
|2016|    Fall|ENG121|      3|   B+|
|2015|    Fall|IST101|      1|    A|
|2015|    Fall|IST195|      3|    A|
|2015|    Fall|IST233|      3|   B+|
|2015|    Fall|SOC101|      3|   A-|
|2015|    Fall|MAT221|      3|    C|
|2016|  Spring|GEO110|      3|   B+|
|2016|  Spring|MAT222|      3|    A|
|2016|  Spring|SOC121|      3|   C+|
|2016|  Spring|BIO240|      3|   B-|
|2017|  Spring|IST462|      3|    A|
|2017|  Spring|MAT411|      3|    C|
|2017|  Spring|SOC422|      3|   B-|
|2017|  Spring|ENV201|      3|   A-|
+----+--------+------+-------+-----+

