# Overview

- [0. Import Packages and Connect to Hive](#0)
- [1. Use Spark SQL API](#1)
  - [1.1. Cache / Unpersist](#1.1)
  - [1.2. Data Schema](#1.2)
  - [1.3. Take a Peek](#1.3)
  - [1.4. Basic Statistics](#1.4)
  - [1.5. Aggregation](#1.5)
  - [1.6. Subsetting](#1.6)
  - [1.7. Transformation](#1.7)
  - [1.8. Sorting](#1.8)
  - [1.9. Sampling](#1.9)
  - [1.10. Metrics for Two Columns](#1.10)
  - [1.11. Operations for Multiple Dataframes](#1.11)
  - [1.12. Save to File](#1.12)
  - [1.13. Partitions](#1.13)
  - [1.14. Others](#1.14)
- [2. Iterate All Tables of a Hive Database](#2)
- [3. References](#3)

# 0. Import Packages and Connect to Hive <a id='0'></a>

In [None]:
# Import packages
import time
from pyspark.sql import HiveContext, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.stat import Statistics

# Use HiveContext to connect Hive
hc = HiveContext(sc)

In [None]:
### Create a DataFrame that contains all data from a table
df = hc.table("ids_meds_mlcustomer.dbo_customeraddress") # 4885823 rows

# 1. Use Spark SQL API<a id='1'></a>

## 1.1. Cache / Unpersist<a id='1.1'></a>

In [None]:
### cache() - Persists with the default storage level (MEMORY_ONLY_SER).  *** MUCH FASTER AFTER CACHE() ***
df.cache()

In [None]:
### persist(storageLevel=StorageLevel(False, True, False, False, 1)) - Sets the storage level to persist its values 
###   across operations after the first time it is computed. This can only be used to assign a new storage level 
###   if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY_SER).
### Reference of Storage Levels: http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
df.persist(StorageLevel.MEMORY_ONLY)

In [None]:
### unpersist(blocking=True) - Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk.
df.unpersist()

## 1.2. Data Schema<a id='1.2'></a>

In [None]:
### columns - Returns all column names as a list.
df.columns

In [None]:
### dtypes - Returns all column names and their data types as a list.
df.dtypes

In [None]:
### printSchema() - Prints out the schema in the tree format.
df.printSchema()

In [None]:
# schema - Returns the schema of this DataFrame as a types.StructType.
df.schema

## 1.3. Take a Peek<a id='1.3'></a>

In [None]:
### count() - Returns the number of rows in this DataFrame.
df.count()

In [None]:
### take(num) - Returns the first num rows as a list of Row.
df.take(2)

In [None]:
### first() - Returns the first row as a Row.
df.first()

In [None]:
### head(n=None) - Returns the first n rows.
df.head(2)

In [None]:
### limit(num) - Limits the result count to the number specified.
df.limit(1).collect()

In [None]:
### Select a column from the data frame
city = df.city # or df["city"]
city

In [None]:
### collect() - Returns all the records as a list of Row.
df.collect()

In [None]:
### show(n=20, truncate=True) - Prints the first n rows to the console.
df.selectExpr("addresstype_id * 10", "concat(city, ', ', state)").show()

## 1.4. Basic Statistics<a id='1.4'></a>

In [None]:
### describe(*cols) - Computes statistics for numeric columns.
### Spark 1.6.1 only supports numerical columns. Spark 2.0 can also compute for String columns.
# df = hc.sql("SELECT CAST(latitude AS FLOAT), CAST(longitude AS FLOAT) \
#                 FROM ids_weather_noaaweatherdb.noaaweatherschema_currentforecastweather") # 91240068 rows
print("Computing column-wise summary statistics ...")
st = time.time()
df.describe().show()
print("\nDone (" + str(time.time() - st) + " sec)") # 5M rows 26 cols - 7 sec; 91M rows 2 col - 97 sec

In [None]:
### cube(*cols) - Create a multi-dimensional cube for the current DataFrame using the specified columns for aggregation on them.
df.cube('county', df.state).count().show()

In [None]:
### explain(extended=False) - Prints the (logical and physical) plans to the console for debugging purpose.
df.explain()

## 1.5. Aggregation<a id='1.5'></a>

In [None]:
### groupBy(*cols) - Groups the DataFrame by the specified columns. See GroupedData for all the available aggregate functions.
df.groupBy(['addresstype_id']).count().collect()

In [None]:
### freqItems(cols, support=None) - Finding frequent items for columns, possibly with false positives.
df.freqItems(['addresstype_id'], 0.001).collect()

In [None]:
# agg(*exprs) - Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).
df.agg({"city": "max"}).collect() # find the max value in alphabetical order in the "city" field
# Alternatively, we can use the pyspark.sql.functions module:
# from pyspark.sql import functions as F
# df.agg(F.max(df.city)).collect()

In [None]:
### agg(*exprs) - Compute aggregates and returns the result as a DataFrame.
###   The available aggregate functions are avg, max, min, sum, count.
gdf = df.groupBy(df.addresstype_id)
gdf.agg({"*": "count"}).collect()

In [None]:
### rollup(*cols) - Create a multi-dimensional rollup for the current DataFrame using the specified columns for aggregation.
df.rollup('city', df.state).count().show()

## 1.6. Subsetting<a id='1.6'></a>

In [None]:
### filter(condition) - Filters rows using the given condition.
df.filter(df.addresstype_id == 1).count()

In [None]:
### select(*cols) - Projects a set of expressions and returns a new DataFrame.
df.select('city', 'state').take(5)

In [None]:
### selectExpr(*expr) - Projects a set of SQL expressions and returns a new DataFrame.
df.selectExpr("addresstype_id * 10", "concat(city, ', ', state)").take(5)

In [None]:
### where(condition) - Filters rows using the given condition. An alias for filter().
df.where(df.addresstype_id == 1).count()

In [None]:
### distinct() - Returns a new DataFrame containing the distinct rows in this DataFrame.
print("Computing distinct rows ...")
st = time.time()
print(df.distinct().count())
print("\nDone (" + str(time.time() - st) + " sec)") # 5M rows 26 cols - 36 sec; 91M rows 2 col - ?? sec

In [None]:
### dropDuplicates(subset=None) - Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
df.dropDuplicates().show()

In [None]:
### dropna(how='any', thresh=None, subset=None) - Returns a new DataFrame omitting rows with null values.

## 1.7. Transformation<a id='1.7'></a>

In [None]:
### withColumn(colName, col) - Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
df.withColumn('citystate', concat(df.city, lit(', '), df.state)).select('city', 'state', 'citystate').take(5)

In [None]:
### withColumnRenamed(existing, new) - Returns a new DataFrame by renaming an existing column.
#df.withColumnRenamed('city', 'city2').select('city2', 'state').take(5)
df.withColumnRenamed('city2', 'city').select('city', 'state').take(5)

In [None]:
### drop(col) - Returns a new DataFrame that drops the specified column.

In [None]:
### foreach(f) - Applies the f function to all Row of this DataFrame.

In [None]:
### map(f) - Returns a new RDD by applying a the f function to each Row. This is a shorthand for df.rdd.map().
df.map(lambda p: p.city).take(5)

In [None]:
### flatMap(f) - Returns a new RDD by first applying the f function to each Row, and then flattening the results.
df.flatMap(lambda p: p.city).take(10)

In [None]:
### replace(to_replace, value, subset=None) - Returns a new DataFrame replacing a value with another value.

In [None]:
### na - Returns a DataFrameNaFunctions for handling missing values. (drop(), fill(), replace())

In [None]:
### fillna(value, subset=None) - Replace null values, alias for na.fill().

In [None]:
### toJSON(use_unicode=True) - Converts a DataFrame into a RDD of string.
df.toJSON().first()

In [None]:
# toDF(*cols) - Returns a new class:DataFrame that with new specified column names
df.toDF('new_id', 'new_customer_id', 'new_addresstype_id', 'new_street1', 'new_street2', 'new_city', 'new_county', \
        'new_state', 'new_zipcode', 'new_latitude', 'new_longitude', 'new_unit', 'new_pobox', 'new_dateupdated', \
        'new_changedate', 'new_audit_id', 'new_audit_ts', 'new_audit_schema_version', 'new_audit_ds', 'new_audit_ds_db', \
        'new_audit_ds_db_schema', 'new_audit_ds_db_table', 'new_audit_op_type', 'new_audit_digest', 'new_audit_consumer_ts', \
        'new_pipeline_processed_date').take(1)

In [None]:
# toPandas() - Returns the contents of this DataFrame as Pandas pandas.DataFrame.
df.toPandas()
# For large tables, the result should be something like org.apache.spark.SparkException: 
#   Job aborted due to stage failure: Total size of serialized results of 2 tasks (1084.9 MB) 
#   is bigger than spark.driver.maxResultSize (1024.0 MB)

## 1.8. Sorting<a id='1.8'></a>

In [None]:
### orderBy(*cols, **kwargs) - Returns a new DataFrame sorted by the specified column(s).
df.orderBy(df.city.desc()).select(df.city).take(5)

In [None]:
### sort(*cols, **kwargs) - Returns a new DataFrame sorted by the specified column(s).
df.sort(df.city.desc()).select('city', 'state').take(5)

## 1.9. Sampling<a id='1.9'></a>

In [None]:
### sample(withReplacement, fraction, seed=None) - Returns a sampled subset of this DataFrame.
df.sample(False, 0.5, 42).count()

In [None]:
### sampleBy(col, fractions, seed=None) - Returns a stratified sample without replacement based on the fraction given on each stratum.
sampled = df.sampleBy("addresstype_id", fractions={1: 0.2, 2: 0.2, 3: 0.3, 4: 0.3}, seed=0)
sampled.groupBy("addresstype_id").count().orderBy("addresstype_id").show()

In [None]:
### randomSplit(weights, seed=None) - Randomly splits this DataFrame with the provided weights.
splits = df.randomSplit([1.0, 2.0], 24)
print(str(splits[0].count()) + ", " + str(splits[1].count()))

## 1.10. Metrics for Two Columns<a id='1.10'></a>

In [None]:
### corr(col1, col2, method=None) - Calculates the correlation of two columns of a DataFrame as a double value. Pearson only.
df2 = hc.sql("SELECT addresstype_id, CAST(zipcode AS INT) FROM ids_meds_mlcustomer.dbo_customeraddress") # 4885823 rows
# df2 = hc.sql("SELECT CAST(latitude AS FLOAT), CAST(longitude AS FLOAT) \
#                 FROM ids_weather_noaaweatherdb.noaaweatherschema_currentforecastweather") # 91240068 rows
print("Computing the Pearson correlation ...\n")
st = time.time()
print(df2.corr('addresstype_id', 'zipcode', 'pearson'))
#print(df2.corr('latitude', 'longitude', 'pearson'))
print("\nDone (" + str(time.time() - st) + " sec)") # 5M rows 2 cols, 11 sec; 91M rows 2 cols, 83 sec; 

In [None]:
### cov(col1, col2) - Calculate the sample covariance for the given columns, specified by their names, as a double value.
#df2 = hc.sql("SELECT addresstype_id, CAST(zipcode AS INT) FROM ids_meds_mlcustomer.dbo_customeraddress") # 4885823 rows
# df2 = hc.sql("SELECT CAST(latitude AS FLOAT), CAST(longitude AS FLOAT) \
#                 FROM ids_weather_noaaweatherdb.noaaweatherschema_currentforecastweather") # 91240068 rows
print("Computing the covariance ...\n")
st = time.time()
print(df2.cov('addresstype_id', 'zipcode'))
#print(df2.cov('latitude', 'longitude'))
print("\nDone (" + str(time.time() - st) + " sec)") # 5M rows 2 cols, 6 sec; 91M rows 2 cols, 61 sec; 

In [None]:
# crosstab(col1, col2) - Computes a pair-wise frequency table of the given columns. Also known as a contingency table.
df.crosstab('addresstype_id', 'pobox').show()

## 1.11. Operations for Multiple Dataframes<a id='1.11'></a>

In [None]:
### join(other, on=None, how=None) - Joins with another DataFrame, using the given join expression.
df3 = hc.table("ids_meds_mlcustomer.dbo_customer") # 3168313 rows
df.join(df3, df.customer_id == df3.client_id, 'outer').select(df.customer_id, df.addresstype_id, df3.lastname).take(3)

In [None]:
### intersect(other) - Return a new DataFrame containing rows only in both this frame and another frame.

In [None]:
### unionAll(other) - Return a new DataFrame containing union of rows in this frame and another frame.

In [None]:
### subtract(other) - Return a new DataFrame containing rows in this frame but not in another frame.

## 1.12. Save to File<a id='1.12'></a>

In [None]:
### write - Interface for saving the content of the DataFrame out into external storage.

## 1.13. Partitions<a id='1.13'></a>

In [None]:
# coalesce(numPartitions) - Returns a new DataFrame that has exactly numPartitions partitions.

In [None]:
# foreachPartition(f) - Applies the f function to each partition of this DataFrame.

In [None]:
# mapPartitions(f, preservesPartitioning=False) - Returns a new RDD by applying the f function to each partition.

In [None]:
# repartition(numPartitions, *cols) - Returns a new DataFrame partitioned by the given partitioning expressions.
#   The resulting DataFrame is hash partitioned.

In [None]:
# sortWithinPartitions(*cols, **kwargs) - Returns a new DataFrame with each partition sorted by the specified column(s).

## 1.14. Others<a id='1.14'></a>

In [None]:
# alias(alias) - Returns a new DataFrame with an alias set.

In [None]:
# drop_duplicates(subset=None) - An alias for dropDuplicates().

In [None]:
# groupby(*cols) - An alias for groupBy().

In [None]:
# isLocal() - Returns True if the collect() and take() methods can be run locally (without any Spark executors).
df.isLocal()

In [None]:
# rdd - Returns the content as an pyspark.RDD of Row.
df.rdd.count()

In [None]:
# registerTempTable(name) - Registers this RDD as a temporary table using the given name.

In [None]:
# stat - Returns a DataFrameStatFunctions for statistic functions. (Covered by other APIs)

# 2. Iterate All Tables of a Hive Database<a id='2'></a>

In [None]:
# Specify a database
database_name = "ids_meds_mlcustomer"
# database_name = "ids_meds_mlinterview"
# database_name = "ids_meds_mlpolicy"
# database_name = "ids_meds_mlreference"

tables = hc.tableNames(database_name) # Get all table names in a database
#tables = ["dbo_customeraddress"] # Get specific tables

In [None]:
# Iterate tables in the specified database (ignore this cell for now)
start = time.time()
for n in tables:
    table_name = database_name + "." + n
    print("Profiling " + table_name + " ..."),
    st = time.time()
    
    #rdd_row = hc.table("ids_meds_mlcustomer.dbo_addresstype") # Get all columns for profiling

    print("Done (" + str(time.time() - st) + " sec)")
print("Total Time: " + str(time.time() - start) + " sec")

# 3. References

https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [None]:
# The End #