In [41]:
from pyspark.sql import SQLContext
import pandas as pd
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import readability
import syntok.segmenter as segmenter
from pyspark.sql import functions as F

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

In [3]:
def run_sql(statement):
    try:
        result = sqlContext.sql(statement)
    except Exception as e:
        print(e.desc, '\n', e.stackTrace)
        return
    return result

In [4]:
run_sql('create database review_matrix')
dbs = run_sql('show databases')

In [5]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data/review_df.csv')

In [6]:
df = (spark.read.format("csv") 
                    .option("inferSchema", "true") 
                    .option("header", "true") 
                    .option("multiLine","true")
                    .load("data/review_df.csv"))

In [7]:
df.dtypes

[('stars', 'string'),
 ('text', 'string'),
 ('useful', 'string'),
 ('nb_days', 'string'),
 ('log_useful', 'string')]

In [8]:
df.printSchema()

root
 |-- stars: string (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: string (nullable = true)
 |-- nb_days: string (nullable = true)
 |-- log_useful: string (nullable = true)



In [9]:
df.show(5)

+-----+--------------------+------+-------+------------------+
|stars|                text|useful|nb_days|        log_useful|
+-----+--------------------+------+-------+------------------+
|    5|Went in for a lun...|     0|    305|               0.0|
|    1|Really one of dir...|     0|   1158|               0.0|
|    1|Terrible place to...|     1|   1628|0.6931471805599453|
|    5|One of the best I...|     1|   1252|0.6931471805599453|
|    5|A favorite amongs...|     0|    932|               0.0|
+-----+--------------------+------+-------+------------------+
only showing top 5 rows



In [72]:
def rename_cols(key, d):
    """Add more annotation to column names for better understanding of the context of the NLP Features"""
    new_dict = {}
    for nested_key in d[key].keys():
        new_dict[key + " " + nested_key] = d[key][nested_key]
    return pd.Series(new_dict).tolist()
def unpack(t):
    """The readability API returns a nested dictionary of dictionaries, where each key in the original dictionary
    corresponds to a feature category e.g. sentence info, readability metric. This unpacks it and adds the new rows
    to the dataframe"""
    tokenized = '\n\n'.join(
     '\n'.join(' '.join(token.value for token in sentence)
        for sentence in paragraph)
     for paragraph in segmenter.analyze(t))
    nested_feature_dict = readability.getmeasures(tokenized, lang = 'en')
    row = []
    for k in nested_feature_dict.keys():
        row_fragment = rename_cols(k, nested_feature_dict)
        row.append(row_fragment)
    flat_list = []
    for sublist in row:
        for item in sublist:
            flat_list.append(item)
    return flat_list
    


In [53]:
sub_df = df.limit(2)
sub_df.show(2)

+-----+--------------------+------+-------+----------+
|stars|                text|useful|nb_days|log_useful|
+-----+--------------------+------+-------+----------+
|    5|Went in for a lun...|     0|    305|       0.0|
|    1|Really one of dir...|     0|   1158|       0.0|
+-----+--------------------+------+-------+----------+



In [73]:
to_list_udf = udf(unpack, ArrayType(StringType()))
f = sub_df.withColumn('subset', to_list_udf(sub_df.text))

In [74]:
length = len(f.select('subset').take(1)[0][0])
length

35

In [75]:
df = f.select(['text', 'log_useful']  + [f.subset[i] for i in range(length)])
df.show()

+--------------------+----------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+----------+------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|                text|log_useful|        subset[0]|        subset[1]|         subset[2]|        subset[3]|        subset[4]|         subset[5]|        subset[6]|         subset[7]|        subset[8]|         subset[9]|        subset[10]|       subset[11]|subset[12]|        subset[13]|subset[14]|subset[15]|subset[16]|subset[17]|subset[18]|subset[19]|subset[20]|subset[21]|subset[22]|subset[23]|subset[24]|subset[25]|subset[26]|subset[27]|subset[28]|subset[29]|subset[30]|subset[31]|su