# Cleaning data with PySpark

## Start spark context

In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)
print(sc)
print(sqlContext)

<pyspark.context.SparkContext object at 0x7fcdcaf679e8>
<pyspark.sql.context.SQLContext object at 0x7fcdcaf67978>


In [45]:
from pyspark.sql import SparkSession
from pyspark.sql import types as stypes
from pyspark.sql.functions import udf
from pyspark_dist_explore import hist, distplot
import matplotlib.pyplot as plt
%matplotlib inline
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL cleaning data") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
from datetime import datetime
from datetime import date

## Posts import

In [4]:
fields = [
    stypes.StructField("Id", stypes.IntegerType(), True),\
    stypes.StructField("PostTypeId", stypes.IntegerType(), True),\
    stypes.StructField("AcceptedAnswerId", stypes.IntegerType(), True),\
    stypes.StructField("ParentId", stypes.IntegerType(), True),\
    stypes.StructField("CreationDate", stypes.StringType(), True),\
    stypes.StructField("DeletionDate", stypes.StringType(), True),\
    stypes.StructField("Score", stypes.IntegerType(), True),\
    stypes.StructField("ViewCount", stypes.IntegerType(), True),\
    stypes.StructField("Body", stypes.StringType(), True),\
    stypes.StructField("OwnerUserId", stypes.IntegerType(), True),\
    stypes.StructField("OwnerDisplayName", stypes.StringType(), True),\
    stypes.StructField("LastEditorUserId", stypes.IntegerType(), True),\
    stypes.StructField("LastEditorDisplayName", stypes.StringType(), True),\
    stypes.StructField("LastEditDate", stypes.StringType(), True),\
    stypes.StructField("LastActivityDate", stypes.StringType(), True),\
    stypes.StructField("Title", stypes.StringType(), True),\
    stypes.StructField("Tags", stypes.StringType(), True),\
    stypes.StructField("AnswerCount", stypes.IntegerType(), True),\
    stypes.StructField("CommentCount", stypes.IntegerType(), True),\
    stypes.StructField("FavoriteCount", stypes.IntegerType(), True),\
    stypes.StructField("ClosedDate", stypes.StringType(), True),\
    stypes.StructField("CommunityOwnedDate", stypes.StringType(), True)
]
customSchema = stypes.StructType(fields)
posts = sqlContext.read\
.format('com.databricks.spark.csv')\
.load("/home/mat/Escritorio/ejecucion_systemml/jupyter_notebook/data/posts_csv/part-00000",\
      schema=customSchema)

In [5]:
#posts.toPandas()

In [6]:
reduced_posts = posts.drop('ParentId','Score','ViewCount','Body','OwnerDisplayName',\
                         'LastEditorUserId','LastEditorDisplayName','LastEditDate','LastActivityDate','AnswerCount',\
                         'CommentCount','FavoriteCount','CommunityOwnedDate','DeletionDate')

In [7]:
#reduced_posts.describe().toPandas()

### UDF to transform questions columns

In [8]:
parse_time_udf = udf(lambda x: \
    float((datetime.strptime(x.split(".")[0], '%Y-%m-%dT%H:%M:%S') - datetime.min).total_seconds()),\
    stypes.FloatType()
)

parse_time_check_udf = udf(lambda x:\
    float((datetime.strptime(x.split(".")[0], '%Y-%m-%dT%H:%M:%S') - datetime.min).total_seconds())\
    if x is not None else None,\
    stypes.FloatType()
)

tag_counter_udf = udf(lambda x:\
    float(len([tag.split("&lt;")[1] for tag in x.split("&gt;")[0:-1]])),\
    stypes.FloatType()
)

title_length_udf = udf(lambda x:\
    float(len(x)),\
    stypes.FloatType()
)

## Questions import

In [9]:
questions = reduced_posts.filter(reduced_posts.PostTypeId == 1)\
.drop("PostTypeId").drop("OwnerUserId")
questions = questions.\
withColumn("ParsedCreationDate", parse_time_udf(questions.CreationDate)).\
withColumn("ParsedClosedDate", parse_time_check_udf(questions.ClosedDate)).\
withColumn("TagCount", tag_counter_udf(questions.Tags)).\
withColumn("TitleLength", title_length_udf(questions.Title)).\
drop("CreationDate","ClosedDate", "Tags", "Title")

In [10]:
#questions.toPandas()

In [11]:
#questions.describe().toPandas()

In [12]:
min_time = questions.select(questions.ParsedCreationDate).rdd.min()[0]
print(min_time)

63430377472.0


# Answers import

In [13]:
answers = reduced_posts.filter(reduced_posts.PostTypeId == 2)\
.drop("AcceptedAnswerId", "Title", "Tags", "ClosedDate","PostTypeId")
answers = answers.withColumn("AnsParsedCreationDate", parse_time_udf(answers.CreationDate)).drop(answers.CreationDate)

In [14]:
#answers.describe().toPandas()

In [15]:
answers.registerTempTable("answers")
questions.registerTempTable("questions")

In [16]:
accepted_answers = sqlContext\
.sql("SELECT * FROM answers WHERE answers.Id IN (SELECT AcceptedAnswerId FROM questions)")\

accepted_answers = accepted_answers\
.withColumnRenamed("Id", "AnswerId")\

In [17]:
#accepted_answers.toPandas()

In [18]:
max_time = accepted_answers.select(accepted_answers.AnsParsedCreationDate).rdd.max()[0]
print(max_time)

63592824832.0


## Users import

In [19]:
fields = [
    stypes.StructField("Id", stypes.IntegerType(), True),\
    stypes.StructField("Reputation", stypes.FloatType(), True),\
    stypes.StructField("CreationDate", stypes.StringType(), True),\
    stypes.StructField("DisplayName", stypes.StringType(), True),\
    stypes.StructField("LastAccessDate", stypes.StringType(), True),\
    stypes.StructField("WebsiteUrl", stypes.StringType(), True),\
    stypes.StructField("Location", stypes.StringType(), True),\
    stypes.StructField("AboutMe", stypes.StringType(), True),\
    stypes.StructField("Views", stypes.IntegerType(), True),\
    stypes.StructField("UpVotes", stypes.IntegerType(), True),\
    stypes.StructField("DownVotes", stypes.IntegerType(), True),\
    stypes.StructField("Age", stypes.FloatType(), True),\
    stypes.StructField("AccountId", stypes.IntegerType(), True),\
]
customSchema = stypes.StructType(fields)
users = sqlContext.read\
.format('com.databricks.spark.csv')\
.load("/home/mat/Escritorio/ejecucion_systemml/jupyter_notebook/data/users_csv/part-00000",\
      schema=customSchema)
users.describe().toPandas()
users = users.drop("CreationDate","DisplayName","LastAccessDate","WebsiteUrl","Location","AboutMe","Views","UpVotes",\
                   "DownVotes","AccountId")
users = users.withColumnRenamed("Id","UserId").withColumnRenamed("Reputation","UserReputation")

In [20]:
users.describe().toPandas()

Unnamed: 0,summary,UserId,UserReputation,Age
0,count,43360.0,43360.0,12901.0
1,mean,31417.35920202952,171.96042435424354,33.28548174560112
2,stddev,19202.403149027137,2012.982474680336,10.22403146691696
3,min,-1.0,1.0,14.0
4,max,62923.0,198942.0,96.0


# Join for final dataframe

### First join answers with users

In [21]:
join_df = questions.join(accepted_answers, accepted_answers.AnswerId == questions.AcceptedAnswerId, 'full')
join_df = join_df.join(users, join_df.OwnerUserId == users.UserId, "left").\
drop("AcceptedAnswerId","UserId","OwnerUserId", "AnswerId", "Id")

In [22]:
#join_df.toPandas()

In [23]:
#join_df.describe().toPandas()

## Final formatting and adding censored variable and final time

In [24]:
def final_time(ParsedCreationDate, ParsedClosedDate, AnsParsedCreationDate):

    if(AnsParsedCreationDate):
        return AnsParsedCreationDate-ParsedCreationDate
    elif(ParsedClosedDate):
        return ParsedClosedDate-ParsedCreationDate
    else:
        return max_time-ParsedCreationDate

final_time_udf = udf(final_time, stypes.FloatType())

def censoring_status(ParsedCreationDate, ParsedClosedDate, AnsParsedCreationDate):
    
    if(AnsParsedCreationDate):
        return float(1)
    elif(ParsedClosedDate):
        return float(0)
    else:
        return float(0)
    
censoring_status_udf = udf(censoring_status, stypes.FloatType())

In [25]:
final_df = join_df.\
withColumn("FinalTime",\
final_time_udf(join_df.ParsedCreationDate, join_df.ParsedClosedDate, join_df.AnsParsedCreationDate)).\
withColumn("CensoringStatus",\
censoring_status_udf(join_df.ParsedCreationDate, join_df.ParsedClosedDate, join_df.AnsParsedCreationDate)).\
drop("ParsedCreationDate", "ParsedClosedDate", "AnsParsedCreationDate")

In [26]:
#final_df.describe().toPandas()

## Filtering invalid results

In [27]:
final_filtered_df = final_df.filter(final_df.FinalTime > float(0))

In [28]:
final_filtered_df.describe().toPandas()

Unnamed: 0,summary,TagCount,TitleLength,UserReputation,Age,FinalTime,CensoringStatus
0,count,24009.0,24009.0,12425.0,5554.0,24009.0,24009.0
1,mean,2.058894581198717,59.9233204215086,46809.372796780684,37.44382427079582,21958900.47365571,0.5387146486734141
2,stddev,0.9926152490165466,22.63782239313445,62553.06355062486,9.477650292365588,35532408.06671485,0.4985093045706418
3,min,1.0,15.0,1.0,14.0,4096.0,0.0
4,max,5.0,168.0,198942.0,96.0,162443264.0,1.0


In [29]:
#final_filtered_df.select("FinalTime","CensoringStatus","TagCount","TitleLength","UserReputation","Age")\
#.toPandas().to_csv("/Users/Mat/Desktop/final_df")

In [30]:
#final_filtered_df.select("FinalTime","CensoringStatus","TagCount","TitleLength","UserReputation","Age").\
#write.csv("/Users/Mat/Desktop/final_df_float")

## Grouping the categorical variables

In [31]:
pd_age_distinct = final_filtered_df.select("Age").distinct().toPandas()
pd_age = final_filtered_df.select("Age").toPandas()

In [32]:
pd_age.count()

Age    5554
dtype: int64

In [33]:
print(pd_age.where(pd_age["Age"]>50).count())
print(pd_age.where(pd_age["Age"]<50).count())

Age    535
dtype: int64
Age    4991
dtype: int64


In [34]:
pd_final = final_filtered_df.toPandas()
pd_final.cov()

Unnamed: 0,TagCount,TitleLength,UserReputation,Age,FinalTime,CensoringStatus
TagCount,0.985285,1.359731,-499.5387,0.08048571,-661007.6,0.00817467
TitleLength,1.359731,512.471,-92404.93,12.9244,57813330.0,-0.7737932
UserReputation,-499.538654,-92404.93,3912886000.0,90838.65,10017800000.0,0.0
Age,0.080486,12.9244,90838.65,89.82586,-2053477.0,0.0
FinalTime,-661007.593042,57813330.0,10017800000.0,-2053477.0,1262552000000000.0,-10615960.0
CensoringStatus,0.008175,-0.7737932,0.0,0.0,-10615960.0,0.2485115


In [35]:
pd_final.plot([pd_final["CensoringStatus"],pd_final["FinalTime"]])

<matplotlib.axes._subplots.AxesSubplot at 0x7fcda9df5ef0>

In [36]:
type(pd_final.Age)

pandas.core.series.Series

In [37]:
pd_final.Age.values.tolist()[3]

nan

In [38]:
pd_final.describe()

Unnamed: 0,TagCount,TitleLength,UserReputation,Age,FinalTime,CensoringStatus
count,24009.0,24009.0,12425.0,5554.0,24009.0,24009.0
mean,2.058895,59.92332,46809.372797,37.443824,21958900.0,0.538715
std,0.992615,22.637822,62553.063551,9.47765,35532410.0,0.498509
min,1.0,15.0,1.0,14.0,4096.0,0.0
25%,1.0,43.0,2532.0,30.0,8192.0,0.0
50%,2.0,57.0,14130.0,37.0,208896.0,1.0
75%,3.0,73.0,66793.0,44.0,32763900.0,1.0
max,5.0,168.0,198942.0,96.0,162443300.0,1.0


In [39]:
pd_final.TagCount.hist()

<matplotlib.axes._subplots.AxesSubplot at 0x7fcda9df5ef0>

In [40]:
print(pd_final.TitleLength.where(pd_final.TitleLength<100).count())
print(pd_final.TitleLength.where(pd_final.TitleLength>100).count())

22678
1239


In [41]:
pd_final.TitleLength.hist(bins=200)

<matplotlib.axes._subplots.AxesSubplot at 0x7fcda9df5ef0>

In [42]:
pd_final.TitleLength.hist(bins=10)

<matplotlib.axes._subplots.AxesSubplot at 0x7fcda9df5ef0>

In [43]:
pd_final.UserReputation.describe()

count     12425.000000
mean      46809.372797
std       62553.063551
min           1.000000
25%        2532.000000
50%       14130.000000
75%       66793.000000
max      198942.000000
Name: UserReputation, dtype: float64

In [44]:
plt.scatter(pd_final.UserReputation.values.tolist(), pd_final.FinalTime.values.tolist())

NameError: name 'plt' is not defined

In [None]:
plt.scatter(pd_final.TitleLength.values.tolist(), pd_final.FinalTime.values.tolist())

In [None]:
plt.scatter(pd_final.TagCount.values.tolist(), pd_final.FinalTime.values.tolist())

In [None]:
pd_age.where(pd_age["Age"]<50).hist(bins=200)

In [None]:
fig, ax = plt.subplots()
fig.set_size_inches(20, 20)
hist(ax, final_filtered_df.select(final_filtered_df.Age), bins = 200, color=['blue'])

# Exploratory analysis over the data

In [None]:
from pyspark_dist_explore import hist, distplot
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
#fig, ax = plt.subplots()
#hist(ax, final_filtered_df.select(final_filtered_df.Age), bins = 20, color=['blue'])

In [None]:
#fig, ax = plt.subplots()
#fig.set_size_inches(20, 20)
#hist(ax, final_filtered_df.select(final_filtered_df.UserReputation), bins = 200, color=['blue'])

In [None]:
#fig, ax = plt.subplots()
#fig.set_size_inches(20, 20)
#hist(ax, final_filtered_df.select(final_filtered_df.FinalTime).where(final_filtered_df.FinalTime<20000),\
#     bins = 200, color=['blue'])