# Dataframes 

* Dataframes are a restricted sub-type of RDDs. 
* Restricting the type allows for more optimization.

* Dataframes store two dimensional data, similar to the type of data stored in a spreadsheet. 
   * Each column in a dataframe can have a different type.
   * Each row contains a `record`.

* Similar to [pandas dataframes](http://pandas.pydata.org/pandas-docs/stable/dsintro.html#dataframe) and [R dataframes](http://www.r-tutor.com/r-introduction/data-frame)

In [1]:
#import findspark
#findspark.init()
from pyspark import SparkContext
sc = SparkContext(master="local[4]")
sc.version

'3.2.1'

In [2]:
import os
import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
%pylab inline

%pylab is deprecated, use %matplotlib inline and import the required libraries.
Populating the interactive namespace from numpy and matplotlib


In [3]:
# Just like using Spark requires having a SparkContext, using SQL requires an SQLContext
sqlContext = SQLContext(sc)
sqlContext 



<pyspark.sql.context.SQLContext at 0xffffbc229850>

## Spark sessions

[A newer API for spark dataframes](https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession)

We will stick to the old API in this class.

A new interface object has been added in **Spark 2.0** called **SparkSession**. A spark session is initialized using a `builder`. For example
```python
spark = SparkSession.builder \
         .master("local") \
         .appName("Word Count") \
         .config("spark.some.config.option", "some-value") \
         .getOrCreate()
```

Using a SparkSession a Parquet file is read [as follows:](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.parquet):
```python
df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
```

### Constructing a DataFrame from an RDD of Rows
Each Row defines it's own  fields, the schema is *inferred*.

In [4]:
# One way to create a DataFrame is to first define an RDD from a list of Rows 
_list=[Row(name=u"John", age=19),
       Row(name=u"Smith", age=23),
       Row(name=u"Sarah", age=18)]
some_rdd = sc.parallelize(_list)
some_rdd.collect()

[Row(name='John', age=19),
 Row(name='Smith', age=23),
 Row(name='Sarah', age=18)]

In [5]:
# The DataFrame is created from the RDD or Rows
# Infer schema from the first row, create a DataFrame and print the schema
some_df = sqlContext.createDataFrame(_list)

In [6]:
some_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [7]:
# A dataframe is an RDD of rows plus information on the schema.
# performing **collect()* on either the RDD or the DataFrame gives the same result.
print(type(some_rdd),type(some_df))
print('some_df =',some_df.collect())
print('some_rdd=',some_rdd.collect())

<class 'pyspark.rdd.RDD'> <class 'pyspark.sql.dataframe.DataFrame'>
some_df = [Row(name='John', age=19), Row(name='Smith', age=23), Row(name='Sarah', age=18)]
some_rdd= [Row(name='John', age=19), Row(name='Smith', age=23), Row(name='Sarah', age=18)]


### Defining the Schema explicitly
The advantage of creating a DataFrame using a pre-defined schema allows the content of the RDD to be simple tuples, rather than rows.

In [8]:
# In this case we create the dataframe from an RDD of tuples (rather than Rows) and provide the schema explicitly
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
                     StructField("person_age", IntegerType(), False)])
  
# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = sqlContext.createDataFrame(another_rdd, schema)
another_df.printSchema()
# root
#  |-- age: binteger (nullable = true)
#  |-- name: string (nullable = true)

root
 |-- person_name: string (nullable = false)
 |-- person_age: integer (nullable = false)



## Loading DataFrames from disk
There are many maethods to load DataFrames from Disk. Here we will discuss three of these methods
1. Parquet
2. JSON (on your own)
3. CSV  (on your own)

In addition, there are API's for connecting Spark to an external database. We will not discuss this type of connection in this class.

### Loading dataframes from JSON files
[JSON](http://www.json.org/) is a very popular readable file format for storing structured data.
Among it's many uses are **twitter**, `javascript` communication packets, and many others. In fact this notebook file (with the extension `.ipynb` is in json format. JSON can also be used to store tabular data and can be easily loaded into a dataframe.

In [21]:
# when loading json files you can specify either a single file or a directory containing many json files.
print('--- json file')
path = "../../Data/people.json"
!cat $path 

# Create a DataFrame from the file(s) pointed to by path
people = sqlContext.read.json(path)
print('\n--- dataframe\n people is a',type(people))
# The inferred schema can be visualized using the printSchema() method.
people.show()

print('--- Schema')
people.printSchema()

--- json file
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

--- dataframe
 people is a <class 'pyspark.sql.dataframe.DataFrame'>
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

--- Schema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



### Excercise: Loading csv files into dataframes

Spark 2.0 includes a facility for reading csv files. In this excercise you are to create similar functionality using your own code.

You are to write a class called `csv_reader` which has the following methods:

* `__init__(self,filepath):` recieves as input the path to a csv file. It throws an exeption `NoSuchFile` if the file does not exist.
* `Infer_Schema()` opens the file, reads the first 10 lines (or less if the file is shorter), and infers the schema. The first line of the csv file defines the column names. The following lines should have the same number of columns and all of the elements of the column should be of the same type. The only types allowd are `int`,`float`,`string`. The method infers the types of the columns, checks that they are consistent, and defines a dataframe schema of the form:
```python
schema = StructType([StructField("person_name", StringType(), False),
                     StructField("person_age", IntegerType(), False)])
```
If everything checks out, the method defines a `self.` variable that stores the schema and returns the schema as it's output. If an error is found an exception `BadCsvFormat` is raised.
* `read_DataFrame()`: reads the file, parses it and creates a dataframe using the inferred schema. If one of the lines beyond the first 10 (i.e. a line that was not read by `InferSchema`) is not parsed correctly, the line is not added to the Dataframe. Instead, it is added to an RDD called `bad_lines`.
The methods returns the dateFrame and the `bad_lines` RDD.

### Parquet files

* [Parquet](http://parquet.apache.org/) is a popular columnar format.

* Spark SQL allows [SQL](https://en.wikipedia.org/wiki/SQL) queries to retrieve a subset of the rows without reading the whole file.

* Compatible with HDFS : allows parallel retrieval on a cluster.

* Parquet compresses the data in each column.

* `<reponame>.parquet` is usually a **directory** with many files or subdirectories.

### Spark and Hive
* Parquet is a **file format** not an independent database server.
* Spark can work with the [Hive](https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started) relational database system that supports the full array of database operations.
* Hive is compatible with HDFS.

In [24]:
dir='../../Data'
parquet_file=dir+"/users.parquet"
!ls $dir

mllib	       namesAndFavColors.parquet  users.parquet
Moby-Dick.txt  people.json		  Weather


In [25]:
#load a Parquet file
print(parquet_file)
df = sqlContext.read.load(parquet_file)
df.show()

../../Data/users.parquet
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [26]:
df2=df.select("name", "favorite_color")
df2.show()

+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+



In [42]:
outfilename="namesAndFavColors.parquet"
!rm -rf $dir/$outfilename
df2.write.save(dir+"/"+outfilename)
!ls -ld $dir/$outfilename

Py4JJavaError: An error occurred while calling o112.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.FileNotFoundException: /home/jovyan/public-notebooks/notebooks/Data/namesAndFavColors.parquet/._SUCCESS.crc (Operation not permitted)
	at java.base/java.io.FileOutputStream.open0(Native Method)
	at java.base/java.io.FileOutputStream.open(FileOutputStream.java:298)
	at java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:237)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:321)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:437)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:440)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$20(FileFormatWriter.scala:240)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:240)
	... 41 more


## Loading a dataframe from a pickle file
Here we are loading a dataframe from a pickle file stored on S3. The pickle file contains meterological data that we will work on in future classes.

In [33]:
from os.path import split,join,exists
from os import mkdir,getcwd,remove
from glob import glob

# create directory if needed

notebook_dir=getcwd()
data_dir=''
weather_dir=join(data_dir,'Weather')
weather_dir='/Users/yoavfreund/academic.papers/Courses/BigDataAnalytics/DSC291_2020/Public-DSC291/Data/Weather/'
print(weather_dir)


/Users/yoavfreund/academic.papers/Courses/BigDataAnalytics/DSC291_2020/Public-DSC291/Data/Weather/


In [34]:
if exists(weather_dir):
    print('directory',weather_dir,'already exists')
else:
    print('making',weather_dir)
    mkdir(weather_dir)

file_index='BBSSBBSS'
zip_file='US_Weather_%s.csv.gz'%file_index #the .csv extension is a mistake, this is a pickle file, not a csv file.
old_files='%s/%s*'%(data_dir,zip_file[:-3])
for f in glob(old_files):
    print('removing',f)
    remove(f)

making /Users/yoavfreund/academic.papers/Courses/BigDataAnalytics/DSC291_2020/Public-DSC291/Data/Weather/


FileNotFoundError: [Errno 2] No such file or directory: '/Users/yoavfreund/academic.papers/Courses/BigDataAnalytics/DSC291_2020/Public-DSC291/Data/Weather/'

In [35]:
command="curl https://mas-dse-open.s3.amazonaws.com/Weather/small/%s > %s/%s"%(zip_file,data_dir,zip_file)
print(command)
!$command
!ls -lh $data_dir/$zip_file

NameError: name 'zip_file' is not defined

In [36]:
!gunzip --keep $data_dir/$zip_file
filename='%s/US_Weather_%s.csv'%(data_dir,file_index)
!ls -lh $filename
import pickle
List=pickle.load(open(filename,'rb'))
len(List)

gzip: / is a directory -- ignored


NameError: name 'file_index' is not defined

## Lets have a look at a real-world dataframe

This dataframe is a small part from a large dataframe (15GB) which stores meteorological data from stations around the world.

In [37]:
#List is a list of Rows. Stored as a pickle file.
df=sqlContext.createDataFrame(List)
print(df.count())
df.show(1)

NameError: name 'List' is not defined

In [38]:
#selecting a subset of the rows so it fits in slide.
df.select('station','year','measurement').show(5)

AnalysisException: cannot resolve 'station' given input columns: [favorite_color, favorite_numbers, name];
'Project ['station, 'year, 'measurement]
+- Relation [name#52,favorite_color#53,favorite_numbers#54] parquet


In [39]:
### Save dataframe as Parquet repository
filename='%s/US_Weather_%s.parquet'%(data_dir,file_index)
!rm -rf $filename
df.write.save(filename)

NameError: name 'file_index' is not defined

## Summary
* Dataframes are an efficient way to store data tables.
* All of the values in a column have the same type.
* A good way to store a dataframe in disk is to use a Parquet file.
* Next: Operations on dataframes.

In [40]:
!du -sh $filename
!du -sh $data_dir/$zip_file

3.1M	.
du: cannot read directory '/etc/ssl/private': Permission denied
du: cannot read directory '/var/cache/apt/archives/partial': Permission denied
du: cannot read directory '/var/cache/ldconfig': Permission denied
du: cannot read directory '/root': Permission denied
du: cannot read directory '/proc/tty/driver': Permission denied
du: cannot access '/proc/1511/task/1511/fd/3': No such file or directory
du: cannot access '/proc/1511/task/1511/fdinfo/3': No such file or directory
du: cannot access '/proc/1511/fd/4': No such file or directory
du: cannot access '/proc/1511/fdinfo/4': No such file or directory
63G	/
