# Stackoverflow

In [None]:
import findspark
findspark.init()

In [23]:
import pyspark

sc = pyspark.SparkContext.getOrCreate()
spark = pyspark.SQLContext(sc)

In [24]:
data = sc.textFile('/user/adampap/stackOverflow/shortStackOverflow.txt')
data.take(30)

[u'1,100,102,,2,Python',
 u'2,101,,100,5,',
 u'2,102,,100,9,',
 u'2,103,,100,3,',
 u'1,104,,,2,Python',
 u'2,105,,104,8,',
 u'2,106,,104,5,',
 u'1,200,,,2,C#',
 u'2,201,,200,5,',
 u'2,202,,200,1,',
 u'2,203,,200,8,',
 u'2,204,,200,8,',
 u'2,205,,200,8,',
 u'1,206,,,1,C#',
 u'2,207,,206,11,',
 u'2,208,,206,6,',
 u'1,300,,,9,Java',
 u'1,301,304,,7,Java',
 u'2,302,,301,9,',
 u'2,303,,301,3,',
 u'2,304,,301,9,',
 u'1,305,306,,3,Java',
 u'2,306,,305,5,',
 u'1,307,309,,3,Java',
 u'2,308,,307,5,',
 u'2,309,,307,5,',
 u'1,400,402,,2,PHP',
 u'2,401,,400,2,',
 u'2,402,,400,7,',
 u'1,403,404,,2,PHP']

In [25]:
QUESTION = '1'
ANSWER = '2'

def map_data(line):
    type_id, own_id, accepted_answer, parent_id, _, tag = line.split(',')
    if type_id == QUESTION:
        is_solved = True if accepted_answer else False

        return (int(own_id), (1, 0, is_solved, tag))
    elif type_id == ANSWER:

        return (int(parent_id), (0, 1))

data = data.map(lambda x: map_data(x))
data.take(30)

[(100, (1, 0, True, u'Python')),
 (100, (0, 1)),
 (100, (0, 1)),
 (100, (0, 1)),
 (104, (1, 0, False, u'Python')),
 (104, (0, 1)),
 (104, (0, 1)),
 (200, (1, 0, False, u'C#')),
 (200, (0, 1)),
 (200, (0, 1)),
 (200, (0, 1)),
 (200, (0, 1)),
 (200, (0, 1)),
 (206, (1, 0, False, u'C#')),
 (206, (0, 1)),
 (206, (0, 1)),
 (300, (1, 0, False, u'Java')),
 (301, (1, 0, True, u'Java')),
 (301, (0, 1)),
 (301, (0, 1)),
 (301, (0, 1)),
 (305, (1, 0, True, u'Java')),
 (305, (0, 1)),
 (307, (1, 0, True, u'Java')),
 (307, (0, 1)),
 (307, (0, 1)),
 (400, (1, 0, True, u'PHP')),
 (400, (0, 1)),
 (400, (0, 1)),
 (403, (1, 0, True, u'PHP'))]

In [26]:
def is_question(line):
    return len(line) > 2

def reduce_by_key(first_line, second_line):
    number_of_questions = first_line[0] + second_line[0]
    number_of_answers = first_line[1] + second_line[1]
    result = (number_of_questions, number_of_answers)
    if is_question(first_line):
        result = (number_of_questions, number_of_answers, first_line[2], first_line[3])
    elif is_question(second_line):
        result = (number_of_questions, number_of_answers, second_line[2], second_line[3])
    return result

data = data.reduceByKey(lambda x, y: reduce_by_key(x, y))
data.take(30)

[(100, (1, 3, True, u'Python')),
 (104, (1, 2, False, u'Python')),
 (300, (1, 0, False, u'Java')),
 (206, (1, 2, False, u'C#')),
 (400, (1, 2, True, u'PHP')),
 (200, (1, 5, False, u'C#')),
 (305, (1, 1, True, u'Java')),
 (307, (1, 2, True, u'Java')),
 (403, (1, 1, True, u'PHP')),
 (301, (1, 3, True, u'Java'))]

In [27]:
def map_questions(value):
    _, number_of_answers, is_solved, tag = value
    return ((tag, is_solved), number_of_answers)

data = data.map(lambda x: map_questions(x[1]))
data.take(30)

[((u'Python', True), 3),
 ((u'Python', False), 2),
 ((u'Java', False), 0),
 ((u'C#', False), 2),
 ((u'PHP', True), 2),
 ((u'C#', False), 5),
 ((u'Java', True), 1),
 ((u'Java', True), 2),
 ((u'PHP', True), 1),
 ((u'Java', True), 3)]

In [28]:
data = data.combineByKey(
    lambda number_of_answers: (number_of_answers, 1),
    lambda current_result, number_of_answers: (current_result[0] + number_of_answers, current_result[1] + 1),
    lambda first_result, second_result: (
        first_result[0] + second_result[0],
        first_result[1] + second_result[1]
    )
)
data.take(30)

[((u'C#', False), (7, 2)),
 ((u'Python', False), (2, 1)),
 ((u'PHP', True), (3, 2)),
 ((u'Java', False), (0, 1)),
 ((u'Python', True), (3, 1)),
 ((u'Java', True), (6, 3))]

In [29]:
NAN = float('nan')

def divide(a, b):
    return float(a) / b if b != 0 else NAN

def calculate_statistics(line):
    key, value = line
    tag, is_solved = key
    number_of_answers, number_of_questions = value

    average = divide(number_of_answers, number_of_questions)

    if is_solved:
        return (tag, (number_of_questions, average, 0, NAN))
    else:
        return (tag, (0, NAN, number_of_questions, average))

data = data.map(lambda x: calculate_statistics(x))
data.take(30)

[(u'C#', (0, nan, 2, 3.5)),
 (u'Python', (0, nan, 1, 2.0)),
 (u'PHP', (2, 1.5, 0, nan)),
 (u'Java', (0, nan, 1, 0.0)),
 (u'Python', (1, 3.0, 0, nan)),
 (u'Java', (3, 2.0, 0, nan))]

In [30]:
import math
def reduce_statistics(first_line, second_line):
    first_line = [0.0 if math.isnan(elem) else elem for elem in first_line]
    seconds_line = [0.0 if math.isnan(elem) else elem for elem in second_line]

    return tuple([x + y for x, y in zip(first_line, seconds_line)])
data = data.reduceByKey(lambda x,y: reduce_statistics(x, y))
data.take(30)

[(u'Python', (1, 3.0, 1, 2.0)),
 (u'C#', (0, nan, 2, 3.5)),
 (u'Java', (3, 2.0, 1, 0.0)),
 (u'PHP', (2, 1.5, 0, nan))]

In [31]:
data = data.sortBy(lambda x: -x[1][0])
data.take(30)


[(u'Java', (3, 2.0, 1, 0.0)),
 (u'PHP', (2, 1.5, 0, nan)),
 (u'Python', (1, 3.0, 1, 2.0)),
 (u'C#', (0, nan, 2, 3.5))]