In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as func
from pyspark.sql.functions import col
from pyspark.sql.functions import sum
from pyspark.sql.functions import when
from pyspark.sql.functions import explode
from pyspark.sql.functions import unix_timestamp
import datetime

In [2]:
sc = pyspark.SparkContext(appName="Tags")
sqlContext = SQLContext(sc)

In [44]:
df = sqlContext.read.parquet("./data/results/Posts_parquet/Posts.parquet")

In [45]:
tags_df=df \
.select(df["Id"].cast('int'), 
        df["Tags"], 
        df["CreationDate"].cast('date'), 
        df['LastEditDate'].cast('date'))
tags_df.limit(5).toPandas()

Unnamed: 0,Id,Tags,CreationDate,LastEditDate
0,1,<mysql><innodb><myisam>,2011-01-03,
1,2,<mysql><version-control><schema>,2011-01-03,2011-01-06
2,3,<database-design><erd>,2011-01-03,2011-01-06
3,4,,2011-01-03,2013-09-23
4,5,<nosql><rdbms><database-recommendation>,2011-01-03,2011-01-06


In [50]:
tag_dict = {
    "mysql.*": "mysql"
    ,"innodb": "mysql"
    ,"myisam": "mysql"
    ,"sql-server.*": "sql-server"
    ,"ssms": "sql-server"
    ,"ssis": "sql-server"
    ,"t-sql": "sql-server"
    ,"postgresql.*": "postgres"
    ,"oracle.*": "oracle"
    ,"document-oriented":"mongodb"
    ,"phpmyadmin": "postgres"
    ,"plsql": "oracle"
    ,"plpgsql": "postgres"
    ,"sqlplus": "oracle"
    ,"rman": "oracle"
    ,"pgadmin": "postgres"
    ,"windows-server.*":"sql-server"
    ,"pg-dump": "postgres"
    ,"psql": "postgres"
    ,"postgis": "postgres"
    ,"nosql": "mongodb"
    ,"mongo.*": "mongodb"
        }

In [51]:
def standardize_tags(tag_list):
#     tags_std=[tag.replace('innodb', 'mysql') for tag in tag_list]
    import re
    tags_std = tag_list
    for key, value in tag_dict.items():
        regex = re.compile(key, re.IGNORECASE)
        tags_std=[regex.sub(value, tag) for tag in tags_std]
#     for i in tag_dict:
#         regex = re.compile(i, re.IGNORECASE)
#         tags_std=[regex.sub(i% tag_dict[i], tag) for tag in tag_list]
    return tags_std

In [46]:
tags_rdd=tags_df.rdd
tags_rdd.count()

127212

In [48]:
tags_not_empty=tags_rdd.filter(lambda x: x[1]!=None)
tags_not_empty.count()

53664

In [55]:
tags_clean=tags_not_empty \
.map(lambda x: (x[0], x[1].replace('><',', '), x[2], x[3])) \
.map(lambda x: (x[0], x[1].replace('<', ''), x[2], x[3])) \
.map(lambda x: (x[0], x[1].replace('>', ''), x[2], x[3]))

tags_split=tags_clean \
.map(lambda x: (x[0], x[1].split(','), x[2], x[3])) \
.map(lambda x: (x[0], [e.strip() for e in x[1]], x[2], x[3]))

tags_standardized=tags_split \
.map(lambda x: (x[0], standardize_tags(x[1]), x[2], x[3]))

tags_unique = tags_standardized \
.map(lambda x: (x[0],set(x[1]), x[2], x[3]))

tags_unique.take(5)

[(1, {'mysql'}, datetime.date(2011, 1, 3), None),
 (2,
  {'mysql', 'schema', 'version-control'},
  datetime.date(2011, 1, 3),
  datetime.date(2011, 1, 6)),
 (3,
  {'database-design', 'erd'},
  datetime.date(2011, 1, 3),
  datetime.date(2011, 1, 6)),
 (5,
  {'database-recommendation', 'mongodb', 'rdbms'},
  datetime.date(2011, 1, 3),
  datetime.date(2011, 1, 6)),
 (6,
  {'postgres', 'replication'},
  datetime.date(2011, 1, 3),
  datetime.date(2011, 8, 16))]

In [58]:
unique_tags=set(tag_dict.values())
unique_tags

{'mongodb', 'mysql', 'oracle', 'postgres', 'sql-server'}

In [None]:
post_tags_meaningful = tags_unique \
.map(lambda x: (x[0],x[1].intersection(unique_tags), x[2], x[3])) \
.filter(lambda x: bool(x[1]))
post_tags_meaningful.take(20)

In [None]:
post_tags_meaningful.filter(lambda x: x[3] == None).count()

In [None]:
common_tags = post_tags_meaningful.filter(lambda x: len(x[1])>1)
common_tags.take(20)

In [None]:
post_tags_to_write=post_tags_meaningful \
.map(lambda x: (x[0],list(x[1]), x[2], x[3]))

fields = [
    StructField('Id', IntegerType(), True),
    StructField('Tags', ArrayType(StringType(), True), True),
    StructField('CreationDate', DateType(), True),
    StructField('LastEditDate', DateType(), True)]
schema = StructType(fields)
df=sqlContext.createDataFrame(post_tags_to_write, schema, samplingRatio = 0.1)
df_filled = df \
.withColumn('LastEditDate', when(
        col('LastEditDate').isNull(), datetime.date.today()).otherwise(col('LastEditDate')))
df_filled.show()

In [None]:
df_filled.coalesce(1).write.parquet("data/results/Posts_clean_parquet")

In [1]:
posts = sqlContext.read.parquet("./data/results/Posts_clean_parquet/Posts.parquet")
# tags_df=df \
# .select(df["Id"].cast('int'), 
#         df["Tags"], 
#         df["CreationDate"].cast('date'), 
#         df['LastEditDate'].cast('date'))
posts.limit(5).toPandas()

NameError: name 'sqlContext' is not defined

In [2]:
posts.filter('Id=96').limit(1).toPandas()

posts=posts.select('Id', explode(col('Tags')).alias('Tag'), 'CreationDate', 'LastEditDate')

posts.filter('Id > 93 AND Id < 98').show(8)

posts_for_df=posts.filter('Tag = "sql-server"').sort(col('CreationDate'))

df=posts_for_df.toPandas()
df.dtypes

import pandas as pd
from matplotlib import pyplot
%matplotlib inline

df

df['CreationDate']=pd.to_datetime(df['CreationDate'], format= '%Y-%m-%d')
df.dtypes

df.plot('CreationDate', 'Id')

df.hist?

# df.groupby([df["CreationDate"].dt.year, df["CreationDate"].dt.month]).count().plot(kind="bar")
df \
.groupby(df["CreationDate"].dt.year).count().plot(kind="bar")
# .select("Id", "CreationDate") \
# .groupby(df["CreationDate"].dt.year).count().plot(kind="bar")

pyplot.hist([1,11,21,31,41], bins=[0,10,20,30,40,50], weights=[10,1,40,33,6]);

NameError: name 'posts' is not defined