# Creating RDDs

In [1]:
import pyspark as ps

sc = ps.SparkContext()

In [2]:
# From text file
# rdd = sc.textFile('file://' + cwd + '/data/donors_choose/project_sample.csv')
rdd_text = sc.textFile('opendata_projects000')

# From a python list
rdd_python = sc.parallelize([1,2,3,4,5,6])

# From json

# From the cloud
# Replace <AWS_ACCESS_KEY_ID> and <AWS_SECRET_ACCESS_KEY> with your credentials
link = 's3n://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@mortar-example-data/airline-data'
rdd_cloud = sc.textFile(link)

# CSV - use SparkCSV to read in csv data


# SparkContext

In [None]:
# create a local Spark context for debug purposes
sc = ps.SparkContext('local')

# we can see that it uses all of the cores [*]
sc.master

# EDA 

In [None]:
# Distinct Counts - these are rdd functions. They give the count of a specific column. 
# Could we use a filter and sum to do this?
rdd_dict.map(lambda row: row['_schoolid']).countApproxDistinct()
rdd_dict.map(lambda row: row['_schoolid']).distinct().count()

# More fancy counting below - see Accumulators and Counters



## Using Dataframes in EDA

In [None]:

# Null value checks and filling in nulls: 
df_force.filter(df_force['students_reached'].isNull()).select('students_reached', 'funding_status').collect()
df_no_null = df_force.fillna(0, ['students_reached'])

# Frequent Items
freq_items = df_no_null.freqItems(['school_city', 'primary_focus_area', \
                                     'grade_level', 'poverty_level','resource_type'], 0.7).collect()

df_no_null.freqItems(['num_donors'], .3).collect()[0]

# Distributions and histograms

df_no_null.groupby('funding_status').count().show()

df_no_null.select('total_donations', 'num_donors', 'students_reached', \
                  df_no_null['total_price_excluding_optional_support'].alias('p_exclude'), \
                  df_no_null['total_price_including_optional_support'].alias('p_include')) \
          .describe().show()

# Outliers
# massive outliers, will skew histogram buckets
outliers = price_rdd.top(3)

# for continuous columns we can use Histogram RDD function
hist = price_rdd.filter(lambda x: x not in outliers).histogram(100)

# Plotting the histogram: 
import matplotlib.pyplot as plt
import pandas as pd

%pylab inline
def plot_rdd_hist(hist):
    idx = []

    for i in range(len(hist[0]) - 1):
        idx.append((hist[0][i] + hist[0][i+1])/ 2)
        
    pd.DataFrame({'counts': hist[1], 'index': idx}).set_index('index').plot(figsize=(16,5))

# Cheap histogram!
cheap_histogram = price_rdd.filter(lambda x: x < 5000).histogram(100)

'''The true power of Spark comes when we start passing data and operations between the local driver and the Spark context. 
In doing so we can combine operations that are most efficient in Spark on a cluster with local methods that operate 
on smaller data.
Here we are using Spark to do the heavy lifting of creating distributiions of the relevant queries, and then 
explore/visualize the condensed data with pandas and matplotlib locally.'''
# A generic Spark histogram plotter
def spark_histogram(df, column):
    donor_counts = df.groupby(column).count()
    donor_df = donor_counts.toPandas()
    donor_df[column] = donor_df.num_donors.astype(float)
    return donor_df.sort(column).set_index(column).iloc[:50,:].plot(kind='bar', figsize=(14,5))

spark_histogram(df_complete, 'num_donors')

complete = df_complete.groupby('num_donors').count().toPandas()
expired = df_expired.groupby('num_donors').count().toPandas()

# correlations - this is on a dataframe so be aware that df is a dataframe....
df_no_null.stat.corr('total_price_excluding_optional_support', 'num_donors')

# Crosstabs.... what are these? 

# categorical/boolean fields can give valuable facets (crosstabs)
df_no_null.crosstab('resource_type', 'funding_status').show()
df_no_null.crosstab('primary_focus_area', 'resource_type').show()

# Output below: like a pivot table in Excel. 

+----------------------------+-----+---------+-----------+-------+
|resource_type_funding_status| live|completed|reallocated|expired|
+----------------------------+-----+---------+-----------+-------+
|                        null|    2|       28|          0|     18|
|                       Other| 4542|    54610|        747|  22550|
|                       Books| 5982|   118810|       1527|  34554|
|                    Visitors|  102|      806|          6|    341|
|                    Supplies|11939|   185870|       2602|  63406|
|                       Trips|  347|     4381|         62|   1474|
|                  Technology|18957|   150500|       2256|  85510|
+----------------------------+-----+---------+-----------+-------+

+--------------------------------+-----+--------+-----+----------+------+--------+----+
|primary_focus_area_resource_type|Trips|Visitors|Other|Technology| Books|Supplies|null|
+--------------------------------+-----+--------+-----+----------+------+--------+----+
|             Literacy & Language|  630|     228|32795|    109605|127282|   75924|   4|
|                            null|    0|       0|    0|         0|     1|       0|  41|
|                Applied Learning| 1197|     104| 9429|     17869|  4863|   22596|   0|
|                  Math & Science| 1902|     323|16353|     75189| 11746|   89101|   3|
|                Music & The Arts|  947|     441| 8305|     19289|  2883|   37804|   0|
|                 Health & Sports|  159|      54| 4633|      3054|   432|   12970|   0|
|                   Special Needs|  241|      32| 7636|     19359|  4112|   17151|   0|
|                History & Civics| 1188|      73| 3298|     12858|  9554|    8271|   0|
+--------------------------------+-----+--------+-----+----------+------+--------+----+

In [None]:
# rdd value_counts()
rdd_dict.map(lambda d: (d['teacher_ny_teaching_fellow'], 1)).reduceByKey(lambda a, b: a + b).collect()

## Missing or null values

To get more info on this, go to https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrameNaFunctions.html

This site lists all the functions that are available like dropna(column name), fillna(), replace(). These are attached to the dataframe structure. 
There is also column.isNull(). An example is illustrated below - research this when required. 

**Accumulators** are used to find how many null values there are in columns.

# Line Splitting

In [4]:
# Using csv libraries to split csv format lines so commas in strings are not a problem

import csv
record = '45,3,27.1,Jonathan,Dinu,"Galvanize","San Francisco , CA", 26'
naive = record.split(',')
csv_lib = csv.reader(['45,3,27.1,Jonathan,Dinu,"Galvanize","San Francisco , CA", 26']).next()
print naive
print "Naive splitting creates {0} fields".format(len(naive)) + "\n"
print csv_lib
print "Using the built in CSV library creates {0} fields".format(len(csv_lib))

['45', '3', '27.1', 'Jonathan', 'Dinu', '"Galvanize"', '"San Francisco ', ' CA"', ' 26']
Naive splitting creates 9 fields

['45', '3', '27.1', 'Jonathan', 'Dinu', 'Galvanize', 'San Francisco , CA', ' 26']
Using the built in CSV library creates 8 fields


# Type conversion utility functions

In [None]:
# Format the date string 
# http://strftime.org/
from datetime import datetime
def date_parse(datestring):
    return None if datestring == '' else str(datetime.strptime(datestring, '%Y-%m-%d'))

# Return booleans the way you need to: 
def boolean_map(field):
    if field == 't':
        return True
    elif field == 'f':
        return False
    else:
        None

# Tricks

In [None]:
# Trick: Count of records that match this filter
# sum the boolean returned (1 or 0) to get a count of records in this filter condition. 

rows_in_error = rdd_csv.filter(lambda row: len(row) != header_columns).sum()

# Trick: foreach 
# calling foreach on an rdd allows you to run any function over all records without any return values. 
# This is an exception that tests if the row is malformed. 
rdd_csv.foreach(throw_exception)

# Spark more admin-type features:

In [None]:
# Printing the debug string for an rdd to view the lineage
print destination_rdd.toDebugString()

# Caching rdds that will be use a lot:
destination_rdd.cache()


# Useful python commands

In [None]:
# str() converts the input to a string 
rdd_no_dups = rdd_csv_corrrect.map(lambda row: str(row)).distinct()