In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [None]:
spark = (
    SparkSession.builder
                .appName("Stack Overflow Data Wrangling")
                .getOrCreate()
)


In [3]:
# Get the data
questions = spark.read.csv("questions.csv", header=True,
                          inferSchema=True, escape='"', multiLine=True)

answers = spark.read.csv("answers.csv", header=True,
                          inferSchema=True, escape='"', multiLine=True)

users = spark.read.csv("users.csv", header=True,
                          inferSchema=True, escape='"', multiLine=True)

In [5]:
# check for repeating columns of all_Data in companies and rename by adding all_data

for answers_col in answers.columns:
    if answers_col in questions.columns + users.columns:
        answers = answers.withColumnRenamed(answers_col, answers_col + "_" + 'answers')
for questions_col in questions.columns:
    if questions_col in answers.columns + users.columns:
        questions = questions.withColumnRenamed(questions_col, questions_col + "_" + 'questions')
for users_col in users.columns:
    if users_col in questions.columns + answers.columns:
        users = users.withColumnRenamed(users_col, users_col + "_" + 'users')

In [6]:
def return_country(col):
    '''
    This function loops through each record in the applied column and checks if its 
    netherlands and returns
    it
    '''
    for word in col.split(','):
        if "Netherlands" in word:
            return word
def return_city(col):
    '''
    This function loops through each record in the applied column and checks if there is no netherlands
    returns only the city part
    it
    '''
    for word in col.split(','):
        if not 'Netherlands' in word:
            return word

In [7]:
udf_return_country = udf(return_country, StringType())
udf_return_city = udf(return_city, StringType())

In [8]:
users.createOrReplaceTempView('new_users')

In [9]:
new_users = spark.sql("SELECT * FROM new_users WHERE location LIKE '%Netherlands%'")
new_users.select('location').show(truncate = False)

+----------------------------------------------+
|location                                      |
+----------------------------------------------+
|Driebergen, Driebergen-Rijsenburg, Netherlands|
|Amsterdam, Netherlands                        |
|Netherlands                                   |
|Netherlands                                   |
|Netherlands                                   |
|Netherlands                                   |
|Amsterdam, Netherlands                        |
|The Netherlands                               |
|Amsterdam, Netherlands                        |
|Rotterdam, Netherlands                        |
|Netherlands                                   |
|Eindhoven, Netherlands                        |
|Netherlands                                   |
|Netherlands                                   |
|Amstelveen, Netherlands                       |
|Netherlands                                   |
|The Netherlands                               |
|Netherlands        

In [10]:
#Extract the country from location with the return_country function
new_users = new_users.withColumn("country", udf_return_country("location"))

In [11]:
#Extract the city from location with the return_city function
new_users = new_users.withColumn("city", udf_return_city("location"))

In [12]:
new_users.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at',
 'country',
 'city']

In [13]:
#Join the users dataframe with the questions dataframe 
joined_df = new_users.join(questions, new_users['id'] == questions['user_id'])

In [14]:
#Filter out only records with view_count 20 or more
joined_df = joined_df.filter(col('view_count') >= 20)

In [15]:
#Join the resulting dataframe with the answers dataframe
results = joined_df.join(answers, joined_df['user_id'] == answers['user_id_answers'])

In [17]:
#Lets take a look at the final dataframe
results.select('updated_at', 'view_count', 'user_id', 'user_id_answers').show(5)

+-------------------+----------+-------+---------------+
|         updated_at|view_count|user_id|user_id_answers|
+-------------------+----------+-------+---------------+
|2019-10-12 18:32:46|       576|7666972|        7666972|
|2019-08-05 14:34:28|       145| 555132|         555132|
|2019-03-16 12:30:45|        69|2548426|        2548426|
|2019-10-10 13:13:07|        32|5798882|        5798882|
|2019-10-10 13:13:07|        32|5798882|        5798882|
+-------------------+----------+-------+---------------+
only showing top 5 rows



In [21]:
#return the minimum updated_at time.
results.select(F.min('updated_at')).show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-02-01 13:54:00|
+-------------------+



In [19]:
results.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at',
 'country',
 'city',
 'id_questions',
 'user_id',
 'title',
 'body',
 'accepted_answer_id',
 'score',
 'view_count',
 'comment_count',
 'created_at_questions',
 'id_answers',
 'user_id_answers',
 'question_id',
 'body_answers',
 'score_answers',
 'comment_count_answers',
 'created_at_answers']

In [20]:
#results.withColumn("updated_at", col("updated_at").cast(DateType)) 

TypeError: unexpected type: <class 'type'>

In [None]:
results.write.format("jdbc").options(
    url='jdbc:postgresql://localhost/postgres',
    driver='org.postgresql.Driver',
    user='postgres',
    password='postgres',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')
