In [1]:
#
###### https://github.com/LucaCanali/Miscellaneous/blob/master/Pyspark_SQL_Magic_Jupyter/IPython_Pyspark_SQL_Magic.py
#
#
# IPython magic functions to use with Pyspark and Spark SQL
# The following code is intended as examples of shorcuts to simplify the use of SQL in pyspark
# The defined functions are:
#
# %sql <statement>          - return a Spark DataFrame for lazy evaluation of the SQL
# %sql_show <statement>     - run the SQL statement and show max_show_lines (50) lines
# %sql_display <statement>  - run the SQL statement and display the results using a HTML table 
#                           - this is implemented passing via Pandas and displays up to max_show_lines (50)
# %sql_explain <statement>  - display the execution plan of the SQL statement
#
# Use: %<magic> for line magic or %%<magic> for cell magic.
#
# Author: Luca.Canali@cern.ch
# September 2016
#

from IPython.core.magic import register_line_cell_magic

# Configuration parameters
max_show_lines = 50         # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True     # Set to False if you want to see only the physical plan when running explain


@register_line_cell_magic
def sql(line, cell=None):
    "Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
    val = cell if cell is not None else line 
    return sqlContext.sql(val)

@register_line_cell_magic
def sql_show(line, cell=None):
    "Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
    val = cell if cell is not None else line 
    return sqlContext.sql(val).show(max_show_lines) 

@register_line_cell_magic
def sql_display(line, cell=None):
    """Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
    Use: %sql_display or %%sql_display"""
    val = cell if cell is not None else line 
    return sqlContext.sql(val).limit(max_show_lines).toPandas() 

@register_line_cell_magic
def sql_explain(line, cell=None):
    "Display the execution plan of the sql. Use: %sql_explain or %%sql_explain"
    val = cell if cell is not None else line 
    return sqlContext.sql(val).explain(detailed_explain)


In [2]:
##############################
###### Load The Delta   ######
##############################
###
### Input delta in folder :  /data 
my_input_delta_table="delta_real_estate_term_definitions"
###
######
##############################Execution##########################
import findspark
findspark.init()
#
import pyspark
from pyspark.sql import functions as pfunc
from pyspark.sql import SQLContext
from pyspark.sql import Window, types
import re
import pandas as pd
import numpy as np
from pandas import DataFrame
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from scipy.stats import kstest
from scipy import stats
#
import subprocess
#
sc = pyspark.SparkContext(appName="Business_Dictionary-Delta")
sqlContext = SQLContext(sc)
#
#
internal_delta_files="file:///home/joci/notebooks/data/"+my_input_delta_table
#
# Join with Internal Curation Data in urltopredict staged folder
from pyspark.sql import functions as F
### use version=1
version=1
## .option("versionAsOf", version)
delta_dataframe_df1=sqlContext.read.format("delta").load(internal_delta_files)\
.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
#
##.read.format("delta").load("/delta/events")
#
delta_dataframe_df1.printSchema()
delta_dataframe_df1.registerTempTable("real_estate_terms")
#

root
 |-- Real_Estate_Term: string (nullable = true)
 |-- Real_Estate_Definition: string (nullable = true)



In [3]:
%%sql_show
select * from real_estate_terms limit 20

+--------------------+----------------------+
|    Real_Estate_Term|Real_Estate_Definition|
+--------------------+----------------------+
| Acceleration clause|  Also known as an ...|
|   Active contingent|  When a seller acc...|
|Active under cont...|  A house is listed...|
|            Addendum|  If a buyer or sel...|
|Adjustable-rate m...|  The interest rate...|
|     Adjustment date|  This is the date ...|
|        Amortization|  Amortization is t...|
|Annual percentage...|  The annual percen...|
|           Appraisal|  An appraisal on y...|
|        Appreciation|  Appreciation is t...|
|      Assessed value|  An assessment is ...|
|          Assignment|  An assignment is ...|
|  Assumable mortgage|  Assumption is whe...|
|    Balloon mortgage|  Instead of a trad...|
|  Bi-weekly mortgage|  A bi-weekly mortg...|
|         Bridge loan|  A bridge loan is ...|
|              Broker|  A broker has pass...|
|             Buydown|  A buydown is a mo...|
|         Call option|  A call opt

In [5]:
%%sql_show
select * from real_estate_terms limit where Real_Estate_Term like 'Broker%'

+----------------+----------------------+
|Real_Estate_Term|Real_Estate_Definition|
+----------------+----------------------+
|          Broker|  A broker has pass...|
+----------------+----------------------+



In [33]:
####
#### Expose most frequent Real-estate Terms in definitions
####
mywords=sqlContext.sql("select Real_Estate_Definition from real_estate_terms")\
.select(explode(split(col("Real_Estate_Definition"), "\s+")).alias("real_estate_term"))
#.explode("Real_Estate_Definition","term_word") ##( line  => line.split(" ") )
###
mywords.printSchema()
####
filler_words_list=['the','a','of','to','is','or','in','on','for','by','an','The','and','A',\
                   'your','as','that','when','their','it','be','with','you','are','--','It','from','can','usually']
####
wordCountDF = mywords\
.filter(~(col("real_estate_term").isin(filler_words_list)))\
.groupBy("real_estate_term").count().orderBy(col('count').desc())
####        
wordCountDF.show()

root
 |-- real_estate_term: string (nullable = true)

+----------------+-----+
|real_estate_term|count|
+----------------+-----+
|            loan|   37|
|        mortgage|   32|
|        property|   28|
|          seller|   27|
|           buyer|   22|
|        interest|   19|
|         monthly|   19|
|          lender|   18|
|            home|   17|
|        borrower|   17|
|          estate|   15|
|            real|   14|
|            rate|   13|
|       property.|   11|
|           loan.|   11|
|         payment|   11|
|             pay|   10|
|        payments|   10|
|           owner|   10|
|          amount|   10|
+----------------+-----+
only showing top 20 rows



In [44]:
%%sql_show
select * from real_estate_terms where Real_Estate_Term like '%mortgage%'

+--------------------+----------------------+
|    Real_Estate_Term|Real_Estate_Definition|
+--------------------+----------------------+
|Adjustable-rate m...|  The interest rate...|
|  Assumable mortgage|  Assumption is whe...|
|    Balloon mortgage|  Instead of a trad...|
|  Bi-weekly mortgage|  A bi-weekly mortg...|
|A conventional mo...|       Convertible ARM|
|A convertible adj...|  Cost of funds ind...|
|A deed-in-lieu of...|               Default|
|If a homeowner de...|           Delinquency|
|A mortgage is con...|       Discount points|
|Discount points a...|          Down payment|
|A due-on-sale cla...|  Earnest money dep...|
|Home equity is th...|                Escrow|
|A fixed-rate mort...|     For sale by owner|
|If a homeowner do...|  Home Equity Conve...|
|The Home Equity C...|  Home equity line ...|
|When you purchase...|  Judicial foreclosure|
|Conforming loan l...|          Lease option|
|Residential loan ...|      Loan origination|
|    No-cost mortgage|  A no-cost 

In [45]:
%%sql_explain
select * from real_estate_terms where Real_Estate_Term like '%mortgage%'

== Parsed Logical Plan ==
'Project [*]
+- 'Filter 'Real_Estate_Term LIKE %mortgage%
   +- 'UnresolvedRelation `real_estate_terms`

== Analyzed Logical Plan ==
Real_Estate_Term: string, Real_Estate_Definition: string
Project [Real_Estate_Term#662, Real_Estate_Definition#663]
+- Filter Real_Estate_Term#662 LIKE %mortgage%
   +- SubqueryAlias `real_estate_terms`
      +- Relation[Real_Estate_Term#662,Real_Estate_Definition#663] parquet

== Optimized Logical Plan ==
Filter (isnotnull(Real_Estate_Term#662) && Contains(Real_Estate_Term#662, mortgage))
+- InMemoryRelation [Real_Estate_Term#662, Real_Estate_Definition#663], StorageLevel(disk, memory, 2 replicas)
      +- *(1) FileScan parquet [Real_Estate_Term#662,Real_Estate_Definition#663] Batched: true, Format: Parquet, Location: TahoeLogFileIndex[file:/home/joci/notebooks/data/delta_real_estate_term_definitions], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Real_Estate_Term:string,Real_Estate_Definition:string>

== Physical 

In [46]:
%%sql_display
select * from real_estate_terms where Real_Estate_Term like '%mortgage%'

Unnamed: 0,Real_Estate_Term,Real_Estate_Definition
0,Adjustable-rate mortgage (ARM),The interest rate for an adjustable-rate mortg...
1,Assumable mortgage,Assumption is when a seller transfers all term...
2,Balloon mortgage,Instead of a traditional fixed-rate mortgage i...
3,Bi-weekly mortgage,A bi-weekly mortgage payment means a homeowner...
4,A conventional mortgage is a loan not guarante...,Convertible ARM
5,A convertible adjustable rate mortgage (ARM) a...,Cost of funds index (COFI)
6,A deed-in-lieu of foreclosure is a document tr...,Default
7,"If a homeowner defaults on their loan, it mean...",Delinquency
8,A mortgage is considered delinquent when a sch...,Discount points
9,Discount points are also known as mortgage poi...,Down payment


In [65]:
#######################################################
#########  Evaluate Frequency of Real Estate Terms  ###
#######################################################
##
nfolder="delta_terms_words_ngrams_real_estate"
out_file1="file:///home/joci/notebooks/data/"+nfolder
##
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import RegexTokenizer
#
#import org.apache.spark.ml.feature.NGram
from pyspark.ml.feature import NGram
#
from collections import Counter
#
from pyspark.ml.feature import NGram
#
from pyspark.ml.feature import NGram, CountVectorizer, VectorAssembler
from pyspark.ml import Pipeline
#
from pyspark.mllib.linalg import SparseVector, DenseVector
#
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
#
# Join with Internal Curation Data in urltopredict staged folder
from pyspark.sql import functions as F
#
regexTokenizer = RegexTokenizer(minTokenLength=1, gaps=False, pattern='\\w+|', inputCol="Real_Estate_Definition", outputCol="words", toLowercase=True)
#
delta_dataframe_df4=sqlContext\
.sql("Select Real_Estate_Term, Real_Estate_Definition from real_estate_terms where Real_Estate_Term IS NOT NULL")\
.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
#
tokenized_DF = regexTokenizer.transform(delta_dataframe_df4)
#
ngram = NGram(n=4, inputCol="words", outputCol="ngrams_4")
countvector = CountVectorizer(inputCol="ngrams_4", outputCol="ngramscounts_4")
# fit a CountVectorizerModel from the corpus.
countvModel = CountVectorizer(inputCol="words", outputCol="features_15", vocabSize=15, minDF=2.0)
# fit a PCA Dimensionality reduction into 7/3=2.x components from ngramscounts_4 ## Too Heavy 1st PCA
pcaNgrams = PCA(k=3, inputCol="ngramscounts_4", outputCol="pcaweightngrams")
# fit a PCA Dimensionality reduction into 85/17=5 components from words
pcaWords = PCA(k=3, inputCol="features_15", outputCol="pcaweightwords")  ## Too Heavy 2nd PCA
#
ngram_frd_DF = ngram.transform(tokenized_DF)
#
ngram_vc_DF = countvector.fit(ngram_frd_DF).transform(ngram_frd_DF)\
.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
ngram_vc_fraud_DF.printSchema()
#
result_ngrams_words_DF = countvModel.fit(ngram_vc_DF).transform(ngram_vc_DF)\
.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
result_ngrams_words_fraud_DF.printSchema()
#
#modelPCA_features_ngram_fraud_DF = pcaWords.fit(result_ngrams_words_fraud_DF).transform(result_ngrams_words_fraud_DF)\
#.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
#modelPCA_features_ngram_fraud_DF.printSchema()
#
result_ngrams_words_DF.coalesce(1).write.format("delta").save(out_file1)
#
print("Calculation of most frequent mortage Ngram term - Finished!")

root
 |-- Real_Estate_Term: string (nullable = true)
 |-- Real_Estate_Definition: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ngrams_4: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- ngramscounts_4: vector (nullable = true)

root
 |-- Real_Estate_Term: string (nullable = true)
 |-- Real_Estate_Definition: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ngrams_4: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- ngramscounts_4: vector (nullable = true)
 |-- features_15: vector (nullable = true)

Calculation of most frequent mortage Ngram term - Finished!


In [66]:
#####################################################
###### Real Estate Term - Ngram4 Most-Frequent  #####
#####################################################
#
delta_real_estate_term_folder="delta_terms_words_ngrams_real_estate"
input_delta="file:///home/joci/notebooks/data/"+delta_real_estate_term_folder
#
from pyspark.sql import functions as F
### use version=1
version=1
## .option("versionAsOf", version)
delta_dataframe_df1=sqlContext.read.format("delta").load(input_delta)\
.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
#
##.read.format("delta").load("/delta/events")
#
delta_dataframe_df1.printSchema()
delta_dataframe_df1.registerTempTable("real_estate_ngram4_terms")

root
 |-- Real_Estate_Term: string (nullable = true)
 |-- Real_Estate_Definition: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ngrams_4: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ngramscounts_4: vector (nullable = true)
 |-- features_15: vector (nullable = true)



In [69]:
#### Expose most frequent Real-estate Terms in definitions for Mortgages as NGram4
####
mywords=sqlContext.sql("select real_estate_ngram4_terms.ngrams_4 as real_ngram4 from real_estate_ngram4_terms")\
.select(explode(col("real_ngram4")).alias("real_estate_term_ngram4"))
###
mywords.printSchema()
####
####
filler_words_list=['[]']
####
wordCountDF = mywords\
.filter(~(col("real_estate_term_ngram4").isin(filler_words_list)))\
.groupBy("real_estate_term_ngram4").count().orderBy(col('count').desc())
####        
wordCountDF.show(100,0)

root
 |-- real_estate_term_ngram4: string (nullable = true)

+---------------------------------+-----+
|real_estate_term_ngram4          |count|
+---------------------------------+-----+
|an offer from a                  |2    |
|a real estate agent              |2    |
|much a home is                   |2    |
|is a type of                     |2    |
|of real estate transactions      |2    |
|the buyer and seller             |2    |
|the amount you owe               |2    |
|a no cash out                    |2    |
|commonly referred to as          |2    |
|of a property s                  |2    |
|also known as a                  |2    |
|a bank or financial              |2    |
|the seller to the                |2    |
|to a lender if                   |2    |
|is the amount of                 |2    |
|also commonly referred to        |2    |
|from the seller to               |2    |
|to the borrower s                |2    |
|owner of the property            |2    |
|lowering the b

In [13]:
sc.stop()