# Exercise 1: Schema on Read

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
dfLog = spark.read.text("data/NASA_access_log_Jul95.gz")

# Load the dataset

In [4]:
#Data Source: http://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
dfLog = spark.read.text("data/NASA_access_log_Jul95.gz")

# Quick inspection of  the data set

In [5]:
# see the schema
dfLog.printSchema()

root
 |-- value: string (nullable = true)



In [6]:
# number of lines
dfLog.count()

1891715

In [7]:
#what's in there? 
dfLog.show(5)

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
+--------------------+
only showing top 5 rows



In [8]:
#a better show?
dfLog.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------

In [9]:
#pandas to the rescue
pd.set_option('max_colwidth', 200)
dfLog.limit(5).toPandas()

Unnamed: 0,value
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179"


# Let' try simple parsing with split

In [10]:
from pyspark.sql.functions import split
dfArrays = dfLog.withColumn("tokenized", split("value"," "))
dfArrays.limit(10).toPandas()

Unnamed: 0,value,tokenized
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","[199.72.81.55, -, -, [01/Jul/1995:00:00:01, -0400], ""GET, /history/apollo/, HTTP/1.0"", 200, 6245]"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","[unicomp6.unicomp.net, -, -, [01/Jul/1995:00:00:06, -0400], ""GET, /shuttle/countdown/, HTTP/1.0"", 200, 3985]"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","[199.120.110.21, -, -, [01/Jul/1995:00:00:09, -0400], ""GET, /shuttle/missions/sts-73/mission-sts-73.html, HTTP/1.0"", 200, 4085]"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/countdown/liftoff.html, HTTP/1.0"", 304, 0]"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","[199.120.110.21, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/missions/sts-73/sts-73-patch-small.gif, HTTP/1.0"", 200, 4179]"
5,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /images/NASA-logosmall.gif HTTP/1.0"" 304 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /images/NASA-logosmall.gif, HTTP/1.0"", 304, 0]"
6,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/video/livevideo.gif HTTP/1.0"" 200 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /shuttle/countdown/video/livevideo.gif, HTTP/1.0"", 200, 0]"
7,"205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/countdown.html HTTP/1.0"" 200 3985","[205.212.115.106, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /shuttle/countdown/countdown.html, HTTP/1.0"", 200, 3985]"
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","[d104.aa.net, -, -, [01/Jul/1995:00:00:13, -0400], ""GET, /shuttle/countdown/, HTTP/1.0"", 200, 3985]"
9,"129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] ""GET / HTTP/1.0"" 200 7074","[129.94.144.152, -, -, [01/Jul/1995:00:00:13, -0400], ""GET, /, HTTP/1.0"", 200, 7074]"


# Second attempt, let's build a custom parsing UDF 

In [11]:
from pyspark.sql.functions import udf

@udf
def parseUDF(line):
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [12]:
#Let's start from the beginning
dfParsed= dfLog.withColumn("parsed", parseUDF("value"))
dfParsed.limit(3).toPandas()

Unnamed: 0,value,parsed
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","{response_code=200, protocol=HTTP/1.0, endpoint=/history/apollo/, content_size=6245, method=GET, date_time=01/Jul/1995:00:00:01 -0400, user_id=-, host=199.72.81.55, client_identd=-}"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/countdown/, content_size=3985, method=GET, date_time=01/Jul/1995:00:00:06 -0400, user_id=-, host=unicomp6.unicomp.net, client_identd=-}"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/missions/sts-73/mission-sts-73.html, content_size=4085, method=GET, date_time=01/Jul/1995:00:00:09 -0400, user_id=-, host=199.120.110.21, c..."


In [13]:
dfParsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: string (nullable = true)



# Third attempt, let's fix our UDF

In [14]:
#from pyspark.sql.functions import udf # already imported
from pyspark.sql.types import MapType, StringType

@udf(MapType(StringType(),StringType()))
def parseUDFbetter(line):
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [15]:
#Let's start from the beginning
dfParsed= dfLog.withColumn("parsed", parseUDFbetter("value"))
dfParsed.limit(3).toPandas()

Unnamed: 0,value,parsed
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/history/apollo/', 'content_size': '6245', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:01 -0400', 'user_id': '-', 'host': '199.72..."
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:06 -0400', 'user_id': '-', 'host': 'uni..."
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/mission-sts-73.html', 'content_size': '4085', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:09 -0400', 'us..."


In [16]:
#Let's start from the beginning
dfParsed= dfLog.withColumn("parsed", parseUDFbetter("value"))
dfParsed.limit(3).toPandas()

Unnamed: 0,value,parsed
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/history/apollo/', 'content_size': '6245', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:01 -0400', 'user_id': '-', 'host': '199.72..."
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:06 -0400', 'user_id': '-', 'host': 'uni..."
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/mission-sts-73.html', 'content_size': '4085', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:09 -0400', 'us..."


In [17]:
#Bingo!! we'got a column of type map with the fields parsed
dfParsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [18]:
dfParsed.select("parsed").limit(3).toPandas()

Unnamed: 0,parsed
0,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/history/apollo/', 'content_size': '6245', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:01 -0400', 'user_id': '-', 'host': '199.72..."
1,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:06 -0400', 'user_id': '-', 'host': 'uni..."
2,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/mission-sts-73.html', 'content_size': '4085', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:09 -0400', 'us..."


# Let's build separate columns

In [19]:
dfParsed.selectExpr("parsed['host'] as host").limit(5).show(5)

+--------------------+
|                host|
+--------------------+
|        199.72.81.55|
|unicomp6.unicomp.net|
|      199.120.110.21|
|  burger.letters.com|
|      199.120.110.21|
+--------------------+



In [20]:
dfParsed.selectExpr(["parsed['host']", "parsed['date_time']"]).show(5)

+--------------------+--------------------+
|        parsed[host]|   parsed[date_time]|
+--------------------+--------------------+
|        199.72.81.55|01/Jul/1995:00:00...|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|
|      199.120.110.21|01/Jul/1995:00:00...|
|  burger.letters.com|01/Jul/1995:00:00...|
|      199.120.110.21|01/Jul/1995:00:00...|
+--------------------+--------------------+
only showing top 5 rows



In [21]:
fields = ["host", "client_identd","user_id", "date_time", "method", "endpoint", "protocol", "response_code", "content_size"]
exprs = [ "parsed['{}'] as {}".format(field,field) for field in fields]
exprs


["parsed['host'] as host",
 "parsed['client_identd'] as client_identd",
 "parsed['user_id'] as user_id",
 "parsed['date_time'] as date_time",
 "parsed['method'] as method",
 "parsed['endpoint'] as endpoint",
 "parsed['protocol'] as protocol",
 "parsed['response_code'] as response_code",
 "parsed['content_size'] as content_size"]

In [22]:
dfClean = dfParsed.selectExpr(*exprs)
dfClean.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179


## Popular hosts

In [23]:
from pyspark.sql.functions import desc
dfClean.groupBy("host").count().orderBy(desc("count")).limit(10).toPandas()

Unnamed: 0,host,count
0,piweba3y.prodigy.com,17572
1,piweba4y.prodigy.com,11591
2,piweba1y.prodigy.com,9868
3,alyssa.prodigy.com,7852
4,siltb10.orl.mmc.com,7573
5,piweba2y.prodigy.com,5922
6,edams.ksc.nasa.gov,5434
7,163.206.89.4,4906
8,news.ti.com,4863
9,disarray.demon.co.uk,4353


## Popular content

In [24]:
from pyspark.sql.functions import desc
dfClean.groupBy("endpoint").count().orderBy(desc("count")).limit(10).toPandas()

Unnamed: 0,endpoint,count
0,/images/NASA-logosmall.gif,111330
1,/images/KSC-logosmall.gif,89638
2,/images/MOSAIC-logosmall.gif,60467
3,/images/USA-logosmall.gif,60013
4,/images/WORLD-logosmall.gif,59488
5,/images/ksclogo-medium.gif,58801
6,/images/launch-logo.gif,40871
7,/shuttle/countdown/,40278
8,/ksc.html,40226
9,/images/ksclogosmall.gif,33585


## Large Files

In [25]:
dfClean.createOrReplaceTempView("cleanlog")
spark.sql("""
select endpoint, content_size
from cleanlog 
order by content_size desc
""").limit(10).toPandas()

Unnamed: 0,endpoint,content_size
0,/images/cdrom-1-95/img0007.jpg,99981
1,/shuttle/missions/sts-71/movies/sts-71-launch.mpg,999424
2,/shuttle/missions/sts-71/movies/sts-71-launch.mpg,999424
3,/history/apollo/apollo-13/images/index.gif,99942
4,/history/apollo/apollo-13/images/index.gif,99942
5,/history/apollo/apollo-13/images/index.gif,99942
6,/history/apollo/apollo-13/images/index.gif,99942
7,/history/apollo/apollo-13/images/index.gif,99942
8,/history/apollo/apollo-13/images/index.gif,99942
9,/history/apollo/apollo-13/images/index.gif,99942


In [26]:
from pyspark.sql.functions import expr
dfCleanTyped = dfClean.withColumn("content_size_bytes", expr("cast(content_size  as int)"))
dfCleanTyped.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,content_size_bytes
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179,4179


In [27]:
dfCleanTyped.createOrReplaceTempView("cleantypedlog")
spark.sql("""
select endpoint, content_size
from cleantypedlog 
order by content_size_bytes desc
""").limit(10).toPandas()

Unnamed: 0,endpoint,content_size
0,/shuttle/countdown/video/livevideo.jpeg,6823936
1,/statistics/1995/bkup/Mar95_full.html,3155499
2,/statistics/1995/bkup/Mar95_full.html,3155499
3,/statistics/1995/bkup/Mar95_full.html,3155499
4,/statistics/1995/bkup/Mar95_full.html,3155499
5,/statistics/1995/bkup/Mar95_full.html,3155499
6,/statistics/1995/bkup/Mar95_full.html,3155499
7,/statistics/1995/bkup/Mar95_full.html,3155499
8,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350
9,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350


In [196]:
# Left for you, clean the date column :)
# 1- Create a udf that parses that weird format,
# 2- Create a new column with a data tiem string that spark would understand
# 3- Add a new date-time column properly typed
# 4- Print your schema

# Exercise 2: Advanced Analytics NLP

In [1]:
!pip install spark-nlp==1.7.3

Collecting spark-nlp==1.7.3
[?25l  Downloading https://files.pythonhosted.org/packages/7e/c1/3ec550fbc22efdcac013a301f74d6c904ec545bef291b414be90d900d1d8/spark_nlp-1.7.3-py2.py3-none-any.whl (72.8MB)
[K    100% |████████████████████████████████| 72.8MB 139kB/s ta 0:00:011  1% |▎                               | 757kB 12.5MB/s eta 0:00:06    1% |▌                               | 1.2MB 14.7MB/s eta 0:00:05    2% |█                               | 2.1MB 6.9MB/s eta 0:00:11    3% |█                               | 2.5MB 5.3MB/s eta 0:00:14    7% |██▎                             | 5.2MB 12.6MB/s eta 0:00:06    7% |██▌                             | 5.6MB 7.2MB/s eta 0:00:10    11% |███▊                            | 8.5MB 9.2MB/s eta 0:00:07    13% |████▎                           | 9.7MB 8.0MB/s eta 0:00:08    14% |████▋                           | 10.5MB 9.0MB/s eta 0:00:07    19% |██████▎                         | 14.3MB 12.8MB/s eta 0:00:05    20% |██████▋                         | 15.1M

In [2]:
import pandas as pd
pd.set_option('max_colwidth', 800)

# Create a spark context that includes a 3rd party jar for NLP

In [3]:
#jarPath = "spark-nlp-assembly-1.7.3.jar"

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:1.8.2") \
    .getOrCreate()
spark

# Read multiple files in a dir as one Dataframe

In [None]:
dataPath = "./data/reddit/*.json"
df = spark.read.json(dataPath)
print(df.count())
df.printSchema()

# Deal with Struct type to query subfields 

In [5]:
title = "data.title"
author = "data.author"
dfAuthorTilte = df.select(title, author)
dfAuthorTilte.limit(5).toPandas()

Unnamed: 0,title,author
0,"Microsoft Corp said it has discovered hacking targeting democratic institutions, think tanks, and non-profit organizations in Europe.",jaykirsch
1,Deutsche Bank reportedly planned to extend the dates of $340 million in loans to Trump Organization to avoid a potential nightmare of chasing a sitting president for cash,canuck_burger
2,"Iranian ""morality police"" were forced to fire warning shots when a crowd intervened to prevent them from arresting two women for not wearing a hijab. The incident occurred in Tehran's northeastern Narmak neighbourhood on Friday night, and ended with a mob tearing the door off a police vehicle.",honolulu_oahu_mod
3,"Trump administration 'pushing Saudi nuclear deal' which could benefit company linked to Jared Kushner - Senior Trump administration officials pushed a project to share nuclear power technology with Saudi Arabia over the objections of ethics officials, according to a congressional report",madam1
4,"NASA Happily Reports the Earth is Greener, With More Trees Than 20 Years Ago–and It's Thanks to China, India",purplexxx


# Try to implement the equivalent of flatMap in dataframes

In [7]:
import pyspark.sql.functions as F

dfWordCount = df.select(F.explode(F.split(title,"\\s+")).alias("word")).groupBy("word").count().orderBy(F.desc("count"))
dfWordCount.limit(10).toPandas()

Unnamed: 0,word,count
0,to,58
1,the,46
2,of,42
3,in,41
4,a,25
5,for,20
6,and,19
7,from,12
8,on,11
9,over,10


# Use an NLP libary to do Part-of-Speech Tagging

In [9]:
from com.johnsnowlabs.nlp.pretrained.pipeline.en import BasicPipeline as bp
dfAnnotated = bp.annotate(dfAuthorTilte, "title")
dfAnnotated.printSchema()

root
 |-- text: string (nullable = true)
 |-- author: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- normal: array (nullable = true)
 |    |-- element: struct (contains

## Deal with Map type to query subfields

In [10]:
dfPos = dfAnnotated.select("text", "pos.metadata", "pos.result")
dfPos.limit(5).toPandas()

Unnamed: 0,text,metadata,result
0,"Microsoft Corp said it has discovered hacking targeting democratic institutions, think tanks, and non-profit organizations in Europe.","[{'word': 'Microsoft'}, {'word': 'Corp'}, {'word': 'said'}, {'word': 'it'}, {'word': 'has'}, {'word': 'discovered'}, {'word': 'hacking'}, {'word': 'targeting'}, {'word': 'democratic'}, {'word': 'institutions'}, {'word': 'think'}, {'word': 'tanks'}, {'word': 'and'}, {'word': 'nonprofit'}, {'word': 'organizations'}, {'word': 'in'}, {'word': 'Europe'}]","[NNP, NNP, VBD, PRP, VBZ, VBN, VBG, VBG, JJ, NNS, VBP, NNS, CC, NN, NNS, IN, NNP]"
1,Deutsche Bank reportedly planned to extend the dates of $340 million in loans to Trump Organization to avoid a potential nightmare of chasing a sitting president for cash,"[{'word': 'Deutsche'}, {'word': 'Bank'}, {'word': 'reportedly'}, {'word': 'planned'}, {'word': 'to'}, {'word': 'extend'}, {'word': 'the'}, {'word': 'dates'}, {'word': 'of'}, {'word': 'million'}, {'word': 'in'}, {'word': 'loans'}, {'word': 'to'}, {'word': 'Trump'}, {'word': 'Organization'}, {'word': 'to'}, {'word': 'avoid'}, {'word': 'a'}, {'word': 'potential'}, {'word': 'nightmare'}, {'word': 'of'}, {'word': 'chasing'}, {'word': 'a'}, {'word': 'sitting'}, {'word': 'president'}, {'word': 'for'}, {'word': 'cash'}]","[NNP, NNP, RB, VBD, TO, VB, DT, NNS, IN, CD, IN, NNS, TO, NNP, NNP, TO, VB, DT, JJ, NN, IN, VBG, DT, VBG, NN, IN, NN]"
2,"Iranian ""morality police"" were forced to fire warning shots when a crowd intervened to prevent them from arresting two women for not wearing a hijab. The incident occurred in Tehran's northeastern Narmak neighbourhood on Friday night, and ended with a mob tearing the door off a police vehicle.","[{'word': 'Iranian'}, {'word': 'morality'}, {'word': 'police'}, {'word': 'were'}, {'word': 'forced'}, {'word': 'to'}, {'word': 'fire'}, {'word': 'warning'}, {'word': 'shots'}, {'word': 'when'}, {'word': 'a'}, {'word': 'crowd'}, {'word': 'intervened'}, {'word': 'to'}, {'word': 'prevent'}, {'word': 'them'}, {'word': 'from'}, {'word': 'arresting'}, {'word': 'two'}, {'word': 'women'}, {'word': 'for'}, {'word': 'not'}, {'word': 'wearing'}, {'word': 'a'}, {'word': 'hijab'}, {'word': 'The'}, {'word': 'incident'}, {'word': 'occurred'}, {'word': 'in'}, {'word': 'Tehran'}, {'word': 's'}, {'word': 'northeastern'}, {'word': 'Narmak'}, {'word': 'neighbourhood'}, {'word': 'on'}, {'word': 'Friday'}, {'word': 'night'}, {'word': 'and'}, {'word': 'ended'}, {'word': 'with'}, {'word': 'a'}, {'word': 'mob'...","[JJ, NN, NN, VBD, VBN, TO, VB, NN, NNS, WRB, DT, NN, VBD, TO, VB, PRP, IN, VBG, CD, NNS, IN, RB, VBG, DT, NN, DT, NN, VBD, IN, NNP, VBZ, JJ, NNP, NN, IN, NNP, NN, CC, VBD, IN, DT, NN, VBG, DT, NN, RP, DT, NN, NN]"
3,"Trump administration 'pushing Saudi nuclear deal' which could benefit company linked to Jared Kushner - Senior Trump administration officials pushed a project to share nuclear power technology with Saudi Arabia over the objections of ethics officials, according to a congressional report","[{'word': 'Trump'}, {'word': 'administration'}, {'word': 'pushing'}, {'word': 'Saudi'}, {'word': 'nuclear'}, {'word': 'deal'}, {'word': 'which'}, {'word': 'could'}, {'word': 'benefit'}, {'word': 'company'}, {'word': 'linked'}, {'word': 'to'}, {'word': 'Jared'}, {'word': 'Kushner'}, {'word': 'Senior'}, {'word': 'Trump'}, {'word': 'administration'}, {'word': 'officials'}, {'word': 'pushed'}, {'word': 'a'}, {'word': 'project'}, {'word': 'to'}, {'word': 'share'}, {'word': 'nuclear'}, {'word': 'power'}, {'word': 'technology'}, {'word': 'with'}, {'word': 'Saudi'}, {'word': 'Arabia'}, {'word': 'over'}, {'word': 'the'}, {'word': 'objections'}, {'word': 'of'}, {'word': 'ethics'}, {'word': 'officials'}, {'word': 'according'}, {'word': 'to'}, {'word': 'a'}, {'word': 'congressional'}, {'word': 're...","[NNP, NN, VBG, NNP, NN, NN, WDT, MD, VB, NN, VBN, TO, NNP, NNP, NNP, NNP, NN, NNS, VBD, DT, NN, TO, VB, JJ, NN, NN, IN, NNP, NNP, IN, DT, NNS, IN, NNS, NNS, VBG, TO, DT, JJ, NN]"
4,"NASA Happily Reports the Earth is Greener, With More Trees Than 20 Years Ago–and It's Thanks to China, India","[{'word': 'NASA'}, {'word': 'Happily'}, {'word': 'Reports'}, {'word': 'the'}, {'word': 'Earth'}, {'word': 'is'}, {'word': 'Greener'}, {'word': 'With'}, {'word': 'More'}, {'word': 'Trees'}, {'word': 'Than'}, {'word': 'Years'}, {'word': 'Agoand'}, {'word': 'It'}, {'word': 's'}, {'word': 'Thanks'}, {'word': 'to'}, {'word': 'China'}, {'word': 'India'}]","[NNP, NNP, NNS, DT, NNP, VBZ, NNP, IN, JJR, NNP, IN, NNS, NNP, PRP, VBZ, NNS, TO, NNP, NNP]"


In [11]:
dfPos= dfAnnotated.select(F.explode("pos").alias("pos"))
dfPos.printSchema()
dfPos.toPandas()

root
 |-- pos: struct (nullable = true)
 |    |-- annotatorType: string (nullable = true)
 |    |-- begin: integer (nullable = false)
 |    |-- end: integer (nullable = false)
 |    |-- result: string (nullable = true)
 |    |-- metadata: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)



Unnamed: 0,pos
0,"(pos, 0, 8, NNP, {'word': 'Microsoft'})"
1,"(pos, 10, 13, NNP, {'word': 'Corp'})"
2,"(pos, 15, 18, VBD, {'word': 'said'})"
3,"(pos, 20, 21, PRP, {'word': 'it'})"
4,"(pos, 23, 25, VBZ, {'word': 'has'})"
5,"(pos, 27, 36, VBN, {'word': 'discovered'})"
6,"(pos, 38, 44, VBG, {'word': 'hacking'})"
7,"(pos, 46, 54, VBG, {'word': 'targeting'})"
8,"(pos, 56, 65, JJ, {'word': 'democratic'})"
9,"(pos, 67, 78, NNS, {'word': 'institutions'})"


## Keep only proper nouns NNP or NNPS

In [12]:
nnpFilter = "pos.result = 'NNP' or pos.result = 'NNPS' "
dfNNP = dfPos.where(nnpFilter)
dfNNP.limit(10).toPandas()

Unnamed: 0,pos
0,"(pos, 0, 8, NNP, {'word': 'Microsoft'})"
1,"(pos, 10, 13, NNP, {'word': 'Corp'})"
2,"(pos, 126, 131, NNP, {'word': 'Europe'})"
3,"(pos, 0, 7, NNP, {'word': 'Deutsche'})"
4,"(pos, 9, 12, NNP, {'word': 'Bank'})"
5,"(pos, 81, 85, NNP, {'word': 'Trump'})"
6,"(pos, 87, 98, NNP, {'word': 'Organization'})"
7,"(pos, 175, 180, NNP, {'word': 'Tehran'})"
8,"(pos, 197, 202, NNP, {'word': 'Narmak'})"
9,"(pos, 221, 226, NNP, {'word': 'Friday'})"


## Extract columns form a map in a col

In [13]:
dfWordTag = dfNNP.selectExpr("pos.metadata['word'] as word", "pos.result as tag")
dfWordTag.limit(10).toPandas()

Unnamed: 0,word,tag
0,Microsoft,NNP
1,Corp,NNP
2,Europe,NNP
3,Deutsche,NNP
4,Bank,NNP
5,Trump,NNP
6,Organization,NNP
7,Tehran,NNP
8,Narmak,NNP
9,Friday,NNP


In [14]:
from pyspark.sql.functions import desc
dfWordTag.groupBy("word").count().orderBy(desc("count")).show()

+--------+-----+
|    word|count|
+--------+-----+
|      US|   14|
|   Trump|    9|
|   Saudi|    8|
|   Putin|    7|
|  Russia|    6|
|  Europe|    5|
|  Arabia|    5|
|Catholic|    4|
|      UK|    4|
|Vladimir|    4|
|   China|    3|
| Germany|    3|
|    Pope|    3|
|   Egypt|    3|
|   South|    3|
|   House|    3|
|  Church|    3|
|National|    2|
|  Africa|    2|
|   India|    2|
+--------+-----+
only showing top 20 rows



# Exercise 3: Data Lake on S3

In [1]:
from pyspark.sql import SparkSession
import os
import configparser

# Make sure that your AWS credentials are loaded as env vars

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('aws/credentials.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [None]:
## a little bit slow

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

#### 以上的几步，包括设置credentials和初始化spark，都是不需要的，如果我们在EMRcluster的Notebook里面运行；
#### 现在是因为我们在local machine上运行所以需要这些设置

# Load data from S3

In [4]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv")

In [5]:
df.printSchema()
df.show(5)

root
 |-- _c0: string (nullable = true)

+--------------------+
|                 _c0|
+--------------------+
|payment_id;custom...|
|16050;269;2;7;1.9...|
|16051;269;1;98;0....|
|16052;269;2;678;6...|
|16053;269;2;703;0...|
+--------------------+
only showing top 5 rows



# Infer schema, fix header and separator

In [6]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";", inferSchema=True, header=True)

In [7]:
df.printSchema()
df.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: string (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



# Fix the data yourself 

In [8]:
import  pyspark.sql.functions as F
dfPayment = df.withColumn("payment_date", F.to_timestamp("payment_date"))
dfPayment.printSchema()
dfPayment.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



# Extract the month

In [9]:
dfPayment = dfPayment.withColumn("month", F.month("payment_date"))
dfPayment.show(5)

+----------+-----------+--------+---------+------+--------------------+-----+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|month|
+----------+-----------+--------+---------+------+--------------------+-----+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|    1|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|    1|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|    1|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|    1|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|    1|
+----------+-----------+--------+---------+------+--------------------+-----+
only showing top 5 rows



# Computer aggregate revenue per month

In [10]:
dfPayment.createOrReplaceTempView("payment")
spark.sql("""
    SELECT month, sum(amount) as revenue
    FROM payment
    GROUP by month
    order by revenue desc
""").show()

+-----+------------------+
|month|           revenue|
+-----+------------------+
|    4|28559.460000003943|
|    3|23886.560000002115|
|    2| 9631.879999999608|
|    1| 4824.429999999856|
|    5|  514.180000000001|
+-----+------------------+



# Fix the schema

In [34]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
paymentSchema = R([
    Fld("payment_id",Int()),
    Fld("customer_id",Int()),
    Fld("staff_id",Int()),
    Fld("rental_id",Int()),
    Fld("amount",Dbl()),
    Fld("payment_date",Date()),
])

In [35]:
dfPaymentWithSchema = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";", schema=paymentSchema, header=True)


In [37]:
dfPaymentWithSchema.printSchema()
df.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: date (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



### here we can extract the month directly, no need to use F.moth to create a new column

In [39]:
dfPaymentWithSchema.createOrReplaceTempView("payment")
spark.sql("""
    SELECT month(payment_date) as m, sum(amount) as revenue
    FROM payment
    GROUP by m
    order by revenue desc
""").show()

+---+------------------+
|  m|           revenue|
+---+------------------+
|  4|28559.460000003943|
|  3|23886.560000002115|
|  2| 9631.879999999608|
|  1| 4824.429999999856|
|  5|  514.180000000001|
+---+------------------+

