# Clean and extract Data

In [1]:
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

filepath_read = "Data/Wikipedia_Editor_Survey_2012_-_anonymized_dataset.csv"
filepath_save = "Data/Wikipedia_Editor_Survey_2012_-_anonymized_dataset_clean.csv"
TimeSpent, Age, Education = ["Q20_1_TEXT", "Q21", "Q22"]

df = pd.read_csv(filepath_read)

df = df[[Age,TimeSpent,Education]]     # Strip columns
df = df[pd.notnull(df[TimeSpent])]     # Remove NaN
df = df[pd.notnull(df[Age])]           # Remove NaN
df = df[pd.notnull(df[Education])]     # Remove NaN

df = df[df[Age] != "Decline to state"] # remove decline
df.loc[df[Age] == "_ 18", Age] = 0     # less than 18 => 0
df.loc[df[Age] == "> 99", Age] = 100   # More than 99 => 100

df = df.iloc[1:]                       # Remove first row

df = df.astype(int)

df.to_csv(filepath_save, index=False, header=False)

df.head()

Unnamed: 0,Q21,Q20_1_TEXT,Q22
7,26,0,4
8,71,40,2
9,27,0,3
10,18,2,2
13,42,0,3


### Example Load Clean Data Pandas

In [2]:
filepath = "Data/Wikipedia_Editor_Survey_2012_-_anonymized_dataset_clean.csv"
names = ["Age","Time","Education"]
df = pd.read_csv(filepath, names=names, dtype=dict((n,int) for n in names))

df.head()

Unnamed: 0,Age,Time,Education
0,26,0,4
1,71,40,2
2,27,0,3
3,18,2,2
4,42,0,3


### Example Load Clean Data Open(...)

In [3]:
with open(filepath) as f:
    for i,line in enumerate(f):
        print("line",i,":",line.rstrip())
        if i == 4:
            break

line 0 : 26,0,4
line 1 : 71,40,2
line 2 : 27,0,3
line 3 : 18,2,2
line 4 : 42,0,3


# MapReduce Answers

### Bins

In [4]:
%%file Scripts\_bins.py
# This file serves as common binning functions for all MapReduce classes

_agebins = [19, 25, 35, 50, 64] # + [ >= 64 ]
_timebins = [1, 3, 6, 10, 16] # + [ >= 16 ]

def agebin(age):
    age = int(age)
    for i,val in enumerate(_agebins):
        if age < val:
            return i
    return len(_agebins)
        
def timebin(time):
    time = int(time)
    for i,val in enumerate(_timebins):
        if time < val:
            return i
    return len(_timebins)

Overwriting Scripts\_bins.py


### Count Age Groups MR

In [5]:
%%file Scripts\agegroups.py
from mrjob.job import MRJob
from _bins import agebin

class MRJobAgeGroups(MRJob):
    
    # Map: keyval = (age,1)
    def mapper(self, _, line):
        answers = line.split(",")
        yield agebin(answers[0]), 1
    
    # Red: keyval = (age,sum([1,1..1]))
    def reducer(self, agegroup, counts):
        yield agegroup, sum(counts)
    
if __name__ == '__main__':
    MRJobAgeGroups.run()

Overwriting Scripts\agegroups.py


### Count Distinct Grouped Answers MR

In [6]:
%%file Scripts\distinct.py
from mrjob.job import MRJob
from _bins import agebin,timebin

class MRJobDistinct(MRJob):
    
    # Map: keyval = ((age,time,education),1)
    def mapper(self, _, line):
        answers = line.split(",")
        yield (agebin(answers[0]), timebin(answers[1]), int(answers[2])), 1
    
    # Red: keyval = ((age,time,education),sum([1,1..1]))
    def reducer(self, group, counts):
        yield group, sum(counts)
    
if __name__ == '__main__':
    MRJobDistinct.run()

Overwriting Scripts\distinct.py


### Pearson Age Time Correlation MR

In [7]:
%%file Scripts\corr.py
from mrjob.job import MRJob
from mrjob.step import MRStep
from _bins import agebin,timebin

class MRJobPearsonCorr(MRJob):
    
    keys = ["n","x","xx","xy","y","yy"]
    
    # Mapper mapping each partial sum of each sum-variable for each line in file
    # Map: keyval = ("var_i",var_i_j) - var_i = {"n","x","xx","xy","y","y"}
    def mapper(self, _, line):
        answers = line.split(",")
        xi,yi = agebin(answers[0]), timebin(answers[1])
        yield self.keys[0], 1,
        yield self.keys[1], xi
        yield self.keys[2], xi*xi
        yield self.keys[3], xi*yi
        yield self.keys[4], yi
        yield self.keys[5], yi*yi
    
    # Reducer summarising each variable
    # Map: keyval = ("var_i",sum(var_i_j)) - var_i = {"n","x","xx","xy","y","y"} - j = 0..n-1
    def reducer_sum(self, var, values):
        yield None, (var, sum(values))
     
    # Reducer extracting variables and computing PCC
    # Map: keyval = ("PCC", PCC(n,x,xx,xy,y,yy))
    def reducer_pearson(self, _, varsumpairs):
        varsumpairs = dict(varsumpairs)
        n,x,xx,xy,y,yy = [varsumpairs[k] for k in self.keys]
        yield "PCC", ( xy-x*y/n ) / ( (xx-(x**2)/n) * (yy-(y**2)/n) )**0.5
    
    # 2 step to calculate PCC for age and time
    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer_sum),
            MRStep(reducer=self.reducer_pearson)
        ]
    
if __name__ == '__main__':
    MRJobPearsonCorr.run()

Overwriting Scripts\corr.py


### Complete analysis MR

In [8]:
%%file Scripts\full.py
from mrjob.job import MRJob
from mrjob.step import MRStep
from _bins import agebin,timebin

class MRJobFull(MRJob):
    
    keys = ["n","x","xx","xy","y","yy"]
    
    # Mapper reading input file into binned (age,time) pairs for counting
    # Map: keyval = ((age,time),1)
    def mapper_1(self, _, line):
        answers = line.split(',')
        yield (agebin(answers[0]), timebin(answers[1])), 1
    
    # Reducer counting co-occurance of each distinct (age,time) pair
    # Red: keyval = (age, (time_i, sum([1,1, ... 1])) - i = 0..5
    def reducer_1(self, group, values):
        yield group[0], (group[1], sum(values))
    
    # Reducer collecting list of co-occurancens of age and time-values for each bin sorted by timebin index
    # Red: keyval = (age, [sum(t0), sum(t1), sum(t2), ... sum(t6)])
    def reducer_2(self, a, tvals):
        yield None, (a, [val for _,val in sorted(tvals, key=lambda tval: tval[0])])
    
    # Reducer collecting list of co-occurances for each group-pair
    # Red: keyval = ((groupi,groupj),([vals_i],[vals_j])) - i = 0..5 - j = [1..4] > i
    def reducer_3(self, _, avalpairs):
        avalpairs = list(avalpairs)
        for i,keyvali in enumerate(avalpairs):
            for keyvalj in avalpairs[i+1:]:
                x, vali = keyvali
                y, valj = keyvalj
                key = str(x) + str(y)
                yield key, (vali, valj)
    
    # Mapper mapping the PCC variables for each group-pair
    # Map: keyval = ((group-pair,"var_i"),var_i_j) - var_i = {"n","x","xx","xy","y","y"} - j = 0..n-1
    def mapper_2(self, key, pair):
        x = list(pair[0])
        y = list(pair[1])
        sumx = sum(x)
        sumy = sum(y)
        
        for xi,yi in zip(x,y):
            xi /= sumx # turn into percentages
            yi /= sumy # turn into percentages
            yield (key,self.keys[0]), 1
            yield (key,self.keys[1]), xi
            yield (key,self.keys[2]), xi**2
            yield (key,self.keys[3]), xi*yi
            yield (key,self.keys[4]), yi
            yield (key,self.keys[5]), yi**2
    
    # Reducer summarising each group-pair PCC-variables
    # Red: keyval = (group-pair,("var_i",sum(var_i_j)))  - var_i = {"n","x","xx","xy","y","y"} - j = 0..n-1
    def reducer_4(self, groups_vars, values):
        yield groups_vars[0], (groups_vars[1], sum(values))
    
    # Reudcer extracting variables for each group and computing PCC
    # Red: keyval = (("PCC", group-pair), PCC(n,x,xx,xy,y,yy))
    def reducer_5(self, groups, varsumpairs):
        varsumpairs = dict(varsumpairs)
        n,x,xx,xy,y,yy = [varsumpairs[k] for k in self.keys]
        key = ('PCC',f'Group {groups[0]}', f'Group {groups[1]}')
        yield key, ( xy-x*y/n ) / ( (xx-(x**2)/n) * (yy-(y**2)/n) )**0.5

    # 5 Steps to complete PCC for all group-pairs
    def steps(self):
        return [MRStep(mapper=self.mapper_1, reducer=self.reducer_1),
                MRStep(reducer=self.reducer_2),
                MRStep(reducer=self.reducer_3),
                MRStep(mapper=self.mapper_2, reducer=self.reducer_4),
                MRStep(reducer=self.reducer_5)]
    
if __name__ == '__main__':
    MRJobFull.run()

Overwriting Scripts\full.py


### Test

In [9]:
!python Scripts\agegroups.py Data\Wikipedia_Editor_Survey_2012_-_anonymized_dataset_clean.csv --quiet

0	1501
1	1792
2	2274
3	2203
4	1337
5	540


In [10]:
!python Scripts\distinct.py Data\Wikipedia_Editor_Survey_2012_-_anonymized_dataset_clean.csv --quiet

[0,0,1]	274
[0,0,2]	305
[0,0,3]	39
[0,0,4]	4
[0,0,5]	4
[0,1,1]	158
[0,1,2]	176
[0,1,3]	22
[0,1,4]	4
[0,1,5]	1
[0,2,1]	112
[0,2,2]	109
[0,2,3]	8
[0,2,5]	1
[0,3,1]	38
[0,3,2]	59
[0,3,3]	5
[0,4,1]	33
[0,4,2]	43
[0,4,3]	6
[0,4,4]	3
[0,5,1]	42
[0,5,2]	48
[0,5,3]	6
[0,5,5]	1
[1,0,1]	25
[1,0,2]	316
[1,0,3]	291
[1,0,4]	82
[1,0,5]	10
[1,1,1]	7
[1,1,2]	142
[1,1,3]	175
[1,1,4]	49
[1,1,5]	6
[1,2,1]	11
[1,2,2]	120
[1,2,3]	119
[1,2,4]	26
[1,2,5]	9
[1,3,1]	4
[1,3,2]	52
[1,3,3]	37
[1,3,4]	11
[1,3,5]	2
[1,4,1]	2
[1,4,2]	55
[1,4,3]	74
[1,4,4]	25
[1,4,5]	5
[1,5,1]	7
[1,5,2]	49
[1,5,3]	68
[1,5,4]	13
[2,0,1]	20
[2,0,2]	153
[2,0,3]	359
[2,0,4]	302
[2,0,5]	72
[2,1,1]	14
[2,1,2]	75
[2,1,3]	182
[2,1,4]	190
[2,1,5]	36
[2,2,1]	1
[2,2,2]	46
[2,2,3]	131
[2,2,4]	133
[2,2,5]	35
[2,3,1]	4
[2,3,2]	14
[2,3,3]	53
[2,3,4]	51
[2,3,5]	15
[2,4,1]	2
[2,4,2]	33
[2,4,3]	69
[2,4,4]	81
[2,4,5]	17
[2,5,1]	7
[2,5,2]	36
[2,5,3]	71
[2,5,4]	56
[2,5,5]	16
[3,0,1]	16
[3,0,2]	158
[3,0,3]	279
[3,0,4]	292
[3,0,5]	129
[3,1,1]	13
[3,1,2]	69

In [11]:
!python Scripts\corr.py Data\Wikipedia_Editor_Survey_2012_-_anonymized_dataset_clean.csv --quiet

"PCC"	0.0297173649


In [12]:
!python Scripts\full.py Data\Wikipedia_Editor_Survey_2012_-_anonymized_dataset_clean.csv --quiet

["PCC","Group 0","Group 1"]	0.9911936299
["PCC","Group 0","Group 2"]	0.9934495197
["PCC","Group 0","Group 3"]	0.9878658171
["PCC","Group 0","Group 4"]	0.9634663546
["PCC","Group 0","Group 5"]	0.9413547218
["PCC","Group 1","Group 2"]	0.9991789793
["PCC","Group 1","Group 3"]	0.9991548497
["PCC","Group 1","Group 4"]	0.9842184557
["PCC","Group 1","Group 5"]	0.9601390648
["PCC","Group 2","Group 3"]	0.9987351209
["PCC","Group 2","Group 4"]	0.9833582915
["PCC","Group 2","Group 5"]	0.9617441097
["PCC","Group 3","Group 4"]	0.9879650884
["PCC","Group 3","Group 5"]	0.9625508202
["PCC","Group 4","Group 5"]	0.9871363614


### Test

# SQLite Answers

In [13]:
import sqlite3
from textwrap import indent,dedent

class Sql:
    # Static values
    table = "wiki"
    fields = ["id", "age", "time", "education"]
    datapath = 'Data/Wikipedia_Editor_Survey_2012_-_anonymized_dataset_clean.csv'
    
    # Generate table(columns) string (for population)
    # optionally with types (for table creation)
    def table_cmd(self, crt = False):
        fld = ''
        for i,field in enumerate(self.fields):
            prmkey = ' primary key' if crt and i == 0 else ''
            fldtyp = ' integer' + prmkey if crt else ''
            fld += field + fldtyp + ', '
        return f'{self.table}({fld[:-2]})'

    # Shortcut for executing on cursor and committing to db
    # optionally without printing the query command.
    def exe(self, cmd, verbose = True):
        cmd = dedent(cmd)
        c = self.db.cursor()
        if verbose:
            print(cmd)
        c.execute(cmd + ';')
        self.db.commit()
        return c
    
    # Sets up database, creates and populates table if none is found already,
    # optionally by forcing the database to reload from file.
    def __init__(self, databaseName, reloadDb = False):
        self.db = sqlite3.connect(databaseName)
        self.db.row_factory = lambda _,row: row[0] if len(row) == 1 else row
        self.addFuncs()
        
        tableCheck = self.exe(f'SELECT name FROM sqlite_master WHERE type="table" AND name="{self.table}"', False).fetchone()
        if reloadDb and tableCheck:
            self.exe(f'drop table {self.table}')
        if reloadDb or not tableCheck:
            self.exe(f'create table {self.table_cmd(crt=True)}')
            self.populate()
    
    # Populates table from file given in the static path
    def populate(self):
        with open(self.datapath) as file:
            print("Populating from file: "+self.datapath)
            for i,line in enumerate(file):
                cmd = f'insert into {self.table_cmd()} values ({i}, {line.rstrip()})'
                self.exe(cmd)
            print("Populating completed.")
    
    # Adds non-native support for power and squareroot functions to the database
    def addFuncs(self):
        def power(a,b):
            return float(a)**float(b)
        
        def sqrt(a):
            return float(a)**0.5
        
        self.db.create_function("pow",2,power)
        self.db.create_function("sqrt",1,sqrt)
    
    # Generates query sub-string for extracting age into bins
    # tabs are purely flavour for printing prettily.
    def agebin(self, tabs=0):
        age = self.fields[1]
        substring = f"""
            (case
                when {age} < 19 then 0
                when {age} < 25 then 1
                when {age} < 35 then 2
                when {age} < 50 then 3
                when {age} < 64 then 4
                else 5
            end)"""
        return indent(dedent(substring),"    "*tabs)[1+4*tabs:]
    
    # Generates query sub-string for extracting time into bins
    # tabs are purely flavour for printing prettily.
    def timebin(self, tabs=0):
        time = self.fields[2]
        substring = f"""
            (case
                when {time} < 1 then 0
                when {time} < 3 then 1
                when {time} < 6 then 2
                when {time} < 10 then 3
                when {time} < 16 then 4
                else 5
            end)"""
        return indent(dedent(substring),"    "*tabs)[1+4*tabs:]
    
    # Execute query for counting populations of distinct age groups
    def count_age_groups(self):
        table = self.table #wiki
        command = f"""
            select
                {self.agebin(tabs=4)} as agebin,
                count(*)
            from {table}
            group by agebin
        """
        return self.exe(command).fetchall()
    
    # Execute query for counting distinct grouped answers
    def count_distinct_grouped_answers(self):
        table = self.table #wiki
        edu = self.fields[3]
        command = f"""
            select
                {self.agebin(tabs=4)} as agebin,
                {self.timebin(tabs=4)} as timebin,
                {edu},
                count(*)
            from {table}
            group by agebin,timebin,{edu}
        """
        return self.exe(command).fetchall()
    
    # Execute query for finding pearson correlation between the age and the time column
    def pearson_age_time_correlation(self):
        table = self.table #wiki
        command = f"""
            select ( xy-x*y/n ) / sqrt( ( xx-pow(x,2)/n ) * ( yy-pow(y,2)/n ) )
            from (
                select
                    count(*) as n,
                    sum(xi) as x,
                    sum(xi*xi) as xx,
                    sum(xi*yi) as xy,
                    sum(yi) as y,
                    sum(yi*yi) as yy
                from (
                    select
                        {self.agebin(tabs=6)} as xi,
                        {self.timebin(tabs=6)} as yi
                    from {table}
                )
            )
        """
        return self.exe(command).fetchone()
    
    def close(self):
        self.db.close()

### Test

In [14]:
#sql = Sql('Sqlite/test.sqlite', reloadDb=True)
#sql.close()

In [15]:
sql = Sql('Sqlite/test.sqlite', reloadDb=False)

In [16]:
sql.count_age_groups()


select
    (case
        when age < 19 then 0
        when age < 25 then 1
        when age < 35 then 2
        when age < 50 then 3
        when age < 64 then 4
        else 5
    end) as agebin,
    count(*)
from wiki
group by agebin



[(0, 1501), (1, 1792), (2, 2274), (3, 2203), (4, 1337), (5, 540)]

In [17]:
sql.count_distinct_grouped_answers()


select
    (case
        when age < 19 then 0
        when age < 25 then 1
        when age < 35 then 2
        when age < 50 then 3
        when age < 64 then 4
        else 5
    end) as agebin,
    (case
        when time < 1 then 0
        when time < 3 then 1
        when time < 6 then 2
        when time < 10 then 3
        when time < 16 then 4
        else 5
    end) as timebin,
    education,
    count(*)
from wiki
group by agebin,timebin,education



[(0, 0, 1, 274),
 (0, 0, 2, 305),
 (0, 0, 3, 39),
 (0, 0, 4, 4),
 (0, 0, 5, 4),
 (0, 1, 1, 158),
 (0, 1, 2, 176),
 (0, 1, 3, 22),
 (0, 1, 4, 4),
 (0, 1, 5, 1),
 (0, 2, 1, 112),
 (0, 2, 2, 109),
 (0, 2, 3, 8),
 (0, 2, 5, 1),
 (0, 3, 1, 38),
 (0, 3, 2, 59),
 (0, 3, 3, 5),
 (0, 4, 1, 33),
 (0, 4, 2, 43),
 (0, 4, 3, 6),
 (0, 4, 4, 3),
 (0, 5, 1, 42),
 (0, 5, 2, 48),
 (0, 5, 3, 6),
 (0, 5, 5, 1),
 (1, 0, 1, 25),
 (1, 0, 2, 316),
 (1, 0, 3, 291),
 (1, 0, 4, 82),
 (1, 0, 5, 10),
 (1, 1, 1, 7),
 (1, 1, 2, 142),
 (1, 1, 3, 175),
 (1, 1, 4, 49),
 (1, 1, 5, 6),
 (1, 2, 1, 11),
 (1, 2, 2, 120),
 (1, 2, 3, 119),
 (1, 2, 4, 26),
 (1, 2, 5, 9),
 (1, 3, 1, 4),
 (1, 3, 2, 52),
 (1, 3, 3, 37),
 (1, 3, 4, 11),
 (1, 3, 5, 2),
 (1, 4, 1, 2),
 (1, 4, 2, 55),
 (1, 4, 3, 74),
 (1, 4, 4, 25),
 (1, 4, 5, 5),
 (1, 5, 1, 7),
 (1, 5, 2, 49),
 (1, 5, 3, 68),
 (1, 5, 4, 13),
 (2, 0, 1, 20),
 (2, 0, 2, 153),
 (2, 0, 3, 359),
 (2, 0, 4, 302),
 (2, 0, 5, 72),
 (2, 1, 1, 14),
 (2, 1, 2, 75),
 (2, 1, 3, 182),
 (2, 1, 4, 

In [18]:
sql.pearson_age_time_correlation()


select ( xy-x*y/n ) / sqrt( ( xx-pow(x,2)/n ) * ( yy-pow(y,2)/n ) )
from (
    select
        count(*) as n,
        sum(xi) as x,
        sum(xi*xi) as xx,
        sum(xi*yi) as xy,
        sum(yi) as y,
        sum(yi*yi) as yy
    from (
        select
            (case
                when age < 19 then 0
                when age < 25 then 1
                when age < 35 then 2
                when age < 50 then 3
                when age < 64 then 4
                else 5
            end) as xi,
            (case
                when time < 1 then 0
                when time < 3 then 1
                when time < 6 then 2
                when time < 10 then 3
                when time < 16 then 4
                else 5
            end) as yi
        from wiki
    )
)



0.02974191490115367

In [19]:
sql.close()