In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
spark = SparkSession.builder \
    .config('spark.driver.memory', '3g') \
    .enableHiveSupport().getOrCreate()

In [2]:
! ls /data

Badges	Comments  PostLinks  Posts  Tags  Users  Votes


In [3]:
base_path = 'file:///data'

In [4]:
posts = spark.read.format('parquet').load(f'{base_path}/Posts')

In [5]:
posts.count()

45919817

In [6]:
posts.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- PostTypeId: integer (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- Score: integer (nullable = true)
 |-- ViewCount: integer (nullable = true)
 |-- Body: string (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- Title: string (nullable = true)
 |-- Tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- CommentCount: integer (nullable = true)
 |-- AnswerCount: integer (nullable = true)
 |-- LastEditDate: timestamp (nullable = true)
 |-- LastEditorUserId: integer (nullable = true)
 |-- AcceptedAnswerId: integer (nullable = true)
 |-- FavoriteCount: integer (nullable = true)
 |-- OwnerDisplayName: string (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- LastEditorDisplayName: string (nullable = true)
 |-- CommunityOwnedDate: timestamp (nullable = true)



In [7]:
posts.groupBy('PostTypeId').count().show()

+----------+--------+
|PostTypeId|   count|
+----------+--------+
|         1|18154493|
|         6|     324|
|         3|     167|
|         5|   49909|
|         4|   49909|
|         8|       2|
|         7|       4|
|         2|27665009|
+----------+--------+



In [8]:
QUESTION_POST_TYPE = 1
questions = posts.where(col('PostTypeId') == QUESTION_POST_TYPE)

In [9]:
questions.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- PostTypeId: integer (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- Score: integer (nullable = true)
 |-- ViewCount: integer (nullable = true)
 |-- Body: string (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- Title: string (nullable = true)
 |-- Tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- CommentCount: integer (nullable = true)
 |-- AnswerCount: integer (nullable = true)
 |-- LastEditDate: timestamp (nullable = true)
 |-- LastEditorUserId: integer (nullable = true)
 |-- AcceptedAnswerId: integer (nullable = true)
 |-- FavoriteCount: integer (nullable = true)
 |-- OwnerDisplayName: string (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- LastEditorDisplayName: string (nullable = true)
 |-- CommunityOwnedDate: timestamp (nullable = true)



### 1. Czy pytanie może być community-owned? (`CommunityOwnedDate`)

In [10]:
questions.where(col('CommunityOwnedDate').isNotNull()).select('Id').first()

Row(Id=839899)

### 2. Przykładowy ID zamkniętego pytania z września 2019 (`ClosedDate`)

https://stackoverflow.com/q/839899

In [11]:
questions.where(year('ClosedDate') == 2019) \
    .where(month('ClosedDate') == 9).select('Id').show()

+--------+
|      Id|
+--------+
|  206258|
| 2279993|
|23043938|
|53167196|
|57614714|
|57649465|
|57689162|
|57029989|
|57576987|
|47334139|
|40689858|
|57722308|
|57726456|
|57727515|
|57728075|
|57730003|
|57733132|
|57734725|
|57734751|
|57736739|
+--------+
only showing top 20 rows



In [12]:

# mało badży (do tego czasu)
# za krótkie pytanie -> mniej niż 100 znaków
# długość pytania
# długość tytułu
# wiek
# autor dopiero zarejestrowany (+1)
# niski view count
# tytuł kończy się znakiem zapytania
# tytuł krótszy niż 5 znaków
# CAPS lock w tytule
# dużo znaków interpunkcyjnych w tytule
# obecność obrazków
# "internet"
# "please"
# title zaczyna się wielką literą
# title zaczyna się od "how", "what" albo "why"
# godzina
# czy weekend?
# ile postów usera zostało zamkniętych (do tego czasu)
# ile pytań zadał dotąd
# ma "defaultowy" nick?
# wpisana lokacja?
# owner user id jest pusty?

## Cechy na początek:

 * ocena na minusie (+1)
 * obecność kodu (+1)
 * autor dopiero zarejestrowany (+1)
 * czy 0 tagów?
 * czy 1 tag?
 * czy >1 tag?

```
this is my code
``` 


-> <pre><code>


this is `my` code

--> <code>

In [13]:
questions \
    .show()

+------+--------+----------+--------------------+-----+---------+--------------------+-----------+--------------------+--------------------+--------------------+------------+-----------+--------------------+----------------+----------------+-------------+----------------+----------+---------------------+--------------------+
|    Id|ParentId|PostTypeId|        CreationDate|Score|ViewCount|                Body|OwnerUserId|    LastActivityDate|               Title|                Tags|CommentCount|AnswerCount|        LastEditDate|LastEditorUserId|AcceptedAnswerId|FavoriteCount|OwnerDisplayName|ClosedDate|LastEditorDisplayName|  CommunityOwnedDate|
+------+--------+----------+--------------------+-----+---------+--------------------+-----------+--------------------+--------------------+--------------------+------------+-----------+--------------------+----------------+----------------+-------------+----------------+----------+---------------------+--------------------+
|839854|    null|  

In [14]:
import re
def how_many_code_letters(body):
    if body == None:
        return 0 ## or None?
    letters = 0
    for code in re.finditer('<code>([^<]+)</code>', body):
        letters += len(code.group(1))
    return letters

In [15]:
assert how_many_code_letters('<code>abc</code> <code>1</code>') == 4
assert how_many_code_letters('<pre><code>abc</code></pre> <code>1</code>') == 4
assert how_many_code_letters('''
<pre><code>line1
line2</code></pre> <code>1</code>
''') == 12

In [16]:
how_many_code_letters_udf = udf(how_many_code_letters, IntegerType())

In [17]:
users = spark.read.parquet(f'{base_path}/Users')
# users.printSchema()

In [18]:
days_difference_between_reg_and_post = datediff(questions.CreationDate, users.CreationDate)
input_df = questions.join(users, questions.OwnerUserId == users.Id) \
    .select(
        questions.Id,
        (col('Score')<0).alias('is_score_less_than_zero'),
        'Score',
        col('Body').contains('<code>').alias('has_code_attached'),
        how_many_code_letters_udf('Body').alias('code_length'),
        (days_difference_between_reg_and_post < 2).alias('is_author_just_registered'),
        days_difference_between_reg_and_post.alias('how_many_days_ago_user_registered'),
        col('ClosedDate').isNotNull().alias('is_closed'),
        (size('Tags') == 1).alias('has_one_tag'),
        (size('Tags') == 2).alias('has_two_tags'),
        (size('Tags') >= 3).alias('has_more_than_two_tags'),
    )

## Przydatne funkcje

 * `unix_timestamp`
 * `regexp_extract`

In [19]:
input_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- is_score_less_than_zero: boolean (nullable = true)
 |-- Score: integer (nullable = true)
 |-- has_code_attached: boolean (nullable = true)
 |-- code_length: integer (nullable = true)
 |-- is_author_just_registered: boolean (nullable = true)
 |-- how_many_days_ago_user_registered: integer (nullable = true)
 |-- is_closed: boolean (nullable = false)
 |-- has_one_tag: boolean (nullable = false)
 |-- has_two_tags: boolean (nullable = false)
 |-- has_more_than_two_tags: boolean (nullable = false)



## MLLib

* `label` -> liczba
* `features` -> tablica liczb

In [20]:
from pyspark.ml.feature import VectorAssembler

In [21]:
'''

'''

'\n\n'

In [22]:
assembler = VectorAssembler(inputCols=[
    'is_score_less_than_zero',
    'Score',
    'has_code_attached',
    'code_length',
    'is_author_just_registered',
    'how_many_days_ago_user_registered',
    'has_one_tag',
    'has_two_tags',
    'has_more_than_two_tags',
], outputCol="features")

model_input_df = assembler.transform(input_df) \
    .withColumn('is_closed', col('is_closed').cast('int'))

In [None]:
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel

lr = LogisticRegression(labelCol="is_closed")
my_first_model = lr.fit(model_input_df)

In [None]:
my_first_model.interceptVector

In [None]:
my_first_model.coefficientMatrix

In [None]:
summary = my_first_model.summary
print(summary.areaUnderROC)

In [None]:
print(summary.accuracy)

In [None]:
summary.precisionByLabel

In [None]:
summary.recallByLabel

In [None]:
%matplotlib inline

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(5, 5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(
    summary.roc.select('FPR').collect(), 
    summary.roc.select('TPR').collect())
plt.show()

* persystencja
* test/train
* cross-walidacja

In [None]:
my_first_model.save('/models/lr_v1')

In [None]:
loaded_model = LogisticRegressionModel.load('/models/lr_v1')
loaded_model.coefficientMatrix

In [None]:
loaded_model.transform(model_input_df).first()

In [None]:
(train, test) = model_input_df.randomSplit([0.7, 0.3], seed=42)

In [None]:
train.count()

In [None]:
test.count()