# Problem 2 - Working with SparkSQL (5 points)

This is an interactive PySpark session. Remember that when you open this notebook the `SparkContext` and `SparkSession` are already created, and they are in the `sc` and `spark` variables, respectively. You can run the following two cells to make sure that the Kernel is active.

**Do not insert any additional cells than the ones that are provided.**

In [1]:
sc

In [2]:
spark

## Quazyilx again!

Yes, you remember it. As a reminder, here is the description of the files.

The quazyilx has been malfunctioning, and occasionally generates output with a `-1` for all four measurements, like this:

    2015-12-10T08:40:10Z fnard:-1 fnok:-1 cark:-1 gnuck:-1

There are four different versions of the _quazyilx_ file, each of a different size. As you can see in the output below the file sizes are 50MB (1,000,000 rows), 4.8GB (100,000,000 rows), 18GB (369,865,098 rows) and 36.7GB (752,981,134 rows). The only difference is the length of the number of records, the file structure is the same.

```
[hadoop@ip-172-31-1-240 ~]$ hadoop fs -ls s3://bigdatateaching/quazyilx/
Found 4 items
-rw-rw-rw-   1 hadoop hadoop    52443735 2018-01-25 15:37 s3://bigdatateaching/quazyilx/quazyilx0.txt
-rw-rw-rw-   1 hadoop hadoop  5244417004 2018-01-25 15:37 s3://bigdatateaching/quazyilx/quazyilx1.txt
-rw-rw-rw-   1 hadoop hadoop 19397230888 2018-01-25 15:38 s3://bigdatateaching/quazyilx/quazyilx2.txt
-rw-rw-rw-   1 hadoop hadoop 39489364082 2018-01-25 15:41 s3://bigdatateaching/quazyilx/quazyilx3.txt
```

You will use Spark and SparkSQL to create a Spark DataFrame and then run some analysis on the files using SparkSQL queries.

First, in the following cell, create an RDD called `quazyilx` that reads the `quazyilx1.txt` file from S3.

In [3]:
quazyilx = sc.textFile("s3://bigdatateaching/quazyilx/quazyilx1.txt")

In the next cell, look at the first 50 elements of `quazyilx` to make sure everything is working corectly. This should take a few seconds.

In [4]:
quazyilx.take(50)

[u'2000-01-01 00:00:03 fnard:7 fnok:8 cark:19 gnuck:25',
 u'2000-01-01 00:00:08 fnard:14 fnok:19 cark:16 gnuck:37',
 u'2000-01-01 00:00:17 fnard:12 fnok:11 cark:12 gnuck:8',
 u'2000-01-01 00:00:22 fnard:18 fnok:16 cark:3 gnuck:8',
 u'2000-01-01 00:00:32 fnard:7 fnok:16 cark:7 gnuck:37',
 u'2000-01-01 00:00:40 fnard:6 fnok:14 cark:3 gnuck:30',
 u'2000-01-01 00:00:47 fnard:11 fnok:10 cark:17 gnuck:7',
 u'2000-01-01 00:00:55 fnard:9 fnok:14 cark:13 gnuck:30',
 u'2000-01-01 00:00:56 fnard:10 fnok:1 cark:7 gnuck:6',
 u'2000-01-01 00:00:59 fnard:11 fnok:11 cark:12 gnuck:18',
 u'2000-01-01 00:01:03 fnard:9 fnok:13 cark:14 gnuck:49',
 u'2000-01-01 00:01:06 fnard:12 fnok:10 cark:19 gnuck:30',
 u'2000-01-01 00:01:16 fnard:0 fnok:12 cark:19 gnuck:26',
 u'2000-01-01 00:01:26 fnard:10 fnok:11 cark:10 gnuck:49',
 u'2000-01-01 00:01:30 fnard:9 fnok:5 cark:16 gnuck:13',
 u'2000-01-01 00:01:38 fnard:11 fnok:10 cark:7 gnuck:47',
 u'2000-01-01 00:01:43 fnard:2 fnok:2 cark:20 gnuck:35',
 u'2000-01-01 00:0

We now need to work with the RDD to be able to make a DataFrame. In the following cell, modify the code to create a class called `quazyilx_class` that processes a line and returns it as a slotted structure, with attributes for the `.time`, `.fnard`, `.fnok` and `.cark`. 

You will need to define the Regular Expression and complete the class where it says `#Put your code here:`

In [5]:
import sys
import os,datetime,re
from pyspark.sql import Row

QUAZYILX_RE = "(\d+\-\d+\-\d+\s\d+\:\d+\:\d+)\s(fnard\:-?\d+)\s(fnok\:-?\d+)\s(cark\:-?\d+)\s(gnuck\:-?\d+)"
quazyilx_re = re.compile(QUAZYILX_RE)

class quazyilx_class():
    __slots__ = ['datetime','fnard','fnok','cark','gnuck']
    def __init__(self,line):
        # Parse line with the regular expression
        m = quazyilx_re.search(line)
        if not m:
            self.datetime = None
            self.fnard    = None
            self.fnok     = None
            self.cark     = None
            self.gnuck    = None
            return
        # Put your code here:
        if m:
            from datetime import datetime
            self.datetime = datetime.strptime(m.group(1), '%Y-%m-%d %H:%M:%S')
            self.fnard    = int(m.group(2).split(':')[1])
            self.fnok     = int(m.group(3).split(':')[1])
            self.cark     = int(m.group(4).split(':')[1])
            self.gnuck    = int(m.group(5).split(':')[1])
            return 
                 
    def __str__(self):
        return "{} fnard:{} fnok:{} cark:{} gnuck:{}".format(self.datetime,self.fnard,self.fnok,self.cark,self.gnuck)
    def __repr__(self):
        return "{} fnard:{} fnok:{} cark:{} gnuck:{}".format(self.datetime,self.fnard,self.fnok,self.cark,self.gnuck)

You will then need to turn the quazyilx RDD into a `Row()` object. [Look at the notebook we used in Lab08 for reference](https://github.com/gu-anly502/lab08-spring-2018/blob/master/social-characteristics-of-the-marvel-universe.ipynb). You can do that with a lambda function, like this:

```(python)
lambda q:Row(datetime=q.datetime.isoformat(),fnard=q.fnard,fnok=q.fnok,cark=q.cark,gnuck=q.gnuck))
```

Alternatively, you can add a new method to the Quazyilx class called `.Row()` that returns a Row. All of these ways are more or less equivalent. You just need to pick one of them.  You may find it useful to look at [this documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection).

In the next cell, create an RDD called `line` that converts the `quazyilx` RDD into a `Row()` object using the `quazyilx_class`.

In [6]:
from pyspark.sql import Row

line = quazyilx.map(lambda q:Row(datetime=quazyilx_class(q).datetime.isoformat(), fnard=quazyilx_class(q).fnard, fnok=quazyilx_class(q).fnok,cark=quazyilx_class(q).cark,gnuck=quazyilx_class(q).gnuck))

Look at the first 10 rows to make sure everything is working.

In [7]:
line.take(10)

[Row(cark=19, datetime='2000-01-01T00:00:03', fnard=7, fnok=8, gnuck=25),
 Row(cark=16, datetime='2000-01-01T00:00:08', fnard=14, fnok=19, gnuck=37),
 Row(cark=12, datetime='2000-01-01T00:00:17', fnard=12, fnok=11, gnuck=8),
 Row(cark=3, datetime='2000-01-01T00:00:22', fnard=18, fnok=16, gnuck=8),
 Row(cark=7, datetime='2000-01-01T00:00:32', fnard=7, fnok=16, gnuck=37),
 Row(cark=3, datetime='2000-01-01T00:00:40', fnard=6, fnok=14, gnuck=30),
 Row(cark=17, datetime='2000-01-01T00:00:47', fnard=11, fnok=10, gnuck=7),
 Row(cark=13, datetime='2000-01-01T00:00:55', fnard=9, fnok=14, gnuck=30),
 Row(cark=7, datetime='2000-01-01T00:00:56', fnard=10, fnok=1, gnuck=6),
 Row(cark=12, datetime='2000-01-01T00:00:59', fnard=11, fnok=11, gnuck=18)]

In the following cell, convert the quazyilx RDD into a DataFrame `quazyilx_df` using the `spark.createDataFrame` method, register it as the SQL table `quazyilx_tbl` with the method `.createOrReplaceTempView`. You will want to cache the DataFrame so it doesn't get generated every time you run a query.

In [9]:
#  Let's map the relationships to an RDD of rows in order to create a data frame out of it
quazyilx_df = spark.createDataFrame(line)
quazyilx_df.createOrReplaceTempView("quazyilx_tbl")
quazyilx_df.cache()

DataFrame[cark: bigint, datetime: string, fnard: bigint, fnok: bigint, gnuck: bigint]

Once you create and register the dataframe and table, you will run SQL queries using  `spark.sql()` to calculate the following:

1. The number of rows in the dataset
1. The number of lines that has -1 for `fnard`, `fnok`, `cark` and `gnuck`.
1. The number of lines that have -1 for `fnard` but have `fnok > 5` and `cark > 5`
1. The first datetime in the dataset
1. The first datetime that has -1 for all of the values
1. The last datetime in the dataset
1. The last datetime that has a -1 for all of the values

Place each query into each of the following  seven cells and run it to get the results. Remember, running the query statement itselft will not give you the results you want. You need to do something else to "get" the result.

**Note: in development testing, the first query may take approximately 10-15 minutes to run with the cluster configuration for this assignment (1 master, 4 task nodes of m4.xlarge). If you cache() correctly, all subsequent queries should take no more than 5 seconds.**


In [11]:
df1 = spark.sql("""SELECT count(*) FROM quazyilx_tbl""")
df1.take(10)

[Row(count(1)=100000000)]

In [12]:
df2 = spark.sql("""
SELECT COUNT(*)
FROM quazyilx_tbl
WHERE cark = -1 AND fnard = -1 AND fnok = -1 AND gnuck = -1
""")
df2.take(10)

[Row(count(1)=190)]

In [13]:
df3 = spark.sql("""
SELECT COUNT(*)
FROM quazyilx_tbl
WHERE cark > 5 AND fnard  = -1 AND fnok > 5
""")
df3.take(10)

[Row(count(1)=2114009)]

In [14]:
df4 = spark.sql("""
SELECT datetime
FROM quazyilx_tbl
ORDER BY datetime
LIMIT 1
""")
df4.take(1)

[Row(datetime=u'2000-01-01T00:00:03')]

In [15]:
df5 = spark.sql("""
SELECT datetime
FROM quazyilx_tbl
WHERE cark = -1 AND fnard = -1 AND fnok = -1 AND gnuck = -1
ORDER BY datetime
LIMIT 1
""")
df5.take(1)

[Row(datetime=u'2000-01-28T03:07:44')]

In [16]:
df6 = spark.sql("""
SELECT datetime
FROM quazyilx_tbl
ORDER BY datetime DESC
LIMIT 1
""")
df6.take(1)

[Row(datetime=u'2017-06-05T18:03:07')]

In [17]:
df7 = spark.sql("""
SELECT datetime
FROM quazyilx_tbl
WHERE cark = -1 AND fnard = -1 AND fnok = -1 AND gnuck = -1
ORDER BY datetime DESC
LIMIT 1
""")
df7.take(1)

[Row(datetime=u'2017-04-21T04:57:10')]

When you finish this problem, click on the File -> 'Save and Checkpoint' in the menu bar to make sure that the latest version of the workbook file is saved. Also, before you close this notebook and move on, make sure you disconnect your SparkContext, otherwise you will not be able to re-allocate resources. Remember, you will commit the .ipynb file to the repository for submission (in the master node terminal.)

In [18]:
sc.stop()