In [4]:
from pyspark import SparkConf, SparkContext
import pandas as pd

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

In [11]:
def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

In [17]:
lines = sc.textFile("../data/fakefriends.csv")

In [18]:
rdd = lines.map(parseLine)

In [24]:
rdd.collect()[:5]

[(33, 385), (26, 2), (55, 221), (40, 465), (68, 21)]

In [26]:
rdd.mapValues(lambda x: (x, 1)).collect()[:5]

[(33, (385, 1)), (26, (2, 1)), (55, (221, 1)), (40, (465, 1)), (68, (21, 1))]

In [27]:
rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()[:5]

[(18, (2747, 8)),
 (19, (2346, 11)),
 (20, (825, 5)),
 (21, (2807, 8)),
 (22, (1445, 7))]

In [28]:
totalByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [29]:
totalsByAge.mapValues(lambda x: x[0] / x[1]).collect()[:5]

[(18, 343.375),
 (19, 213.27272727272728),
 (20, 165.0),
 (21, 350.875),
 (22, 206.42857142857142)]

In [30]:
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])

In [31]:
results = averagesByAge.collect()

In [32]:
for result in results:
    print(result)

(18, 343.375)
(19, 213.27272727272728)
(20, 165.0)
(21, 350.875)
(22, 206.42857142857142)
(23, 246.3)
(24, 233.8)
(25, 197.45454545454547)
(26, 242.05882352941177)
(27, 228.125)
(28, 209.1)
(29, 215.91666666666666)
(30, 235.8181818181818)
(31, 267.25)
(32, 207.9090909090909)
(33, 325.3333333333333)
(34, 245.5)
(35, 211.625)
(36, 246.6)
(37, 249.33333333333334)
(38, 193.53333333333333)
(39, 169.28571428571428)
(40, 250.8235294117647)
(41, 268.55555555555554)
(42, 303.5)
(43, 230.57142857142858)
(44, 282.1666666666667)
(45, 309.53846153846155)
(46, 223.69230769230768)
(47, 233.22222222222223)
(48, 281.4)
(49, 184.66666666666666)
(50, 254.6)
(51, 302.14285714285717)
(52, 340.6363636363636)
(53, 222.85714285714286)
(54, 278.0769230769231)
(55, 295.53846153846155)
(56, 306.6666666666667)
(57, 258.8333333333333)
(58, 116.54545454545455)
(59, 220.0)
(60, 202.71428571428572)
(61, 256.22222222222223)
(62, 220.76923076923077)
(63, 384.0)
(64, 281.3333333333333)
(65, 298.2)
(66, 276.4444444444444

# Pandas로 한다면..! 

In [43]:
fake_df = pd.read_csv("../data/fakefriends.csv", header=None)

In [49]:
fake_df.head()
fake_df.drop(0, axis=1, inplace=True)

In [56]:
fake_df.rename(columns={1:"name",2:"age",3:"counts"}, inplace=True)

In [57]:
fake_df.head()

Unnamed: 0,name,age,counts
0,Will,33,385
1,Jean-Luc,26,2
2,Hugh,55,221
3,Deanna,40,465
4,Quark,68,21


In [68]:
fake_df.groupby("age").agg('mean')

Unnamed: 0_level_0,counts
age,Unnamed: 1_level_1
18,343.375
19,213.272727
20,165.0
21,350.875
22,206.428571
23,246.3
24,233.8
25,197.454545
26,242.058824
27,228.125


# 시간 비교

In [74]:
lines = sc.textFile("../data/fakefriends.csv")
fake_df = pd.read_csv("../data/fakefriends.csv", header=None)

In [77]:
%%time
rdd = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for result in results:
    print(result)

(18, 343.375)
(19, 213.27272727272728)
(20, 165.0)
(21, 350.875)
(22, 206.42857142857142)
(23, 246.3)
(24, 233.8)
(25, 197.45454545454547)
(26, 242.05882352941177)
(27, 228.125)
(28, 209.1)
(29, 215.91666666666666)
(30, 235.8181818181818)
(31, 267.25)
(32, 207.9090909090909)
(33, 325.3333333333333)
(34, 245.5)
(35, 211.625)
(36, 246.6)
(37, 249.33333333333334)
(38, 193.53333333333333)
(39, 169.28571428571428)
(40, 250.8235294117647)
(41, 268.55555555555554)
(42, 303.5)
(43, 230.57142857142858)
(44, 282.1666666666667)
(45, 309.53846153846155)
(46, 223.69230769230768)
(47, 233.22222222222223)
(48, 281.4)
(49, 184.66666666666666)
(50, 254.6)
(51, 302.14285714285717)
(52, 340.6363636363636)
(53, 222.85714285714286)
(54, 278.0769230769231)
(55, 295.53846153846155)
(56, 306.6666666666667)
(57, 258.8333333333333)
(58, 116.54545454545455)
(59, 220.0)
(60, 202.71428571428572)
(61, 256.22222222222223)
(62, 220.76923076923077)
(63, 384.0)
(64, 281.3333333333333)
(65, 298.2)
(66, 276.4444444444444

In [78]:
%%time
fake_df.drop(0, axis=1, inplace=True)
fake_df.rename(columns={1:"name",2:"age",3:"counts"}, inplace=True)
fake_df.groupby("age").agg('mean')
print(fake_df.groupby("age").agg('mean'))

         counts
age            
18   343.375000
19   213.272727
20   165.000000
21   350.875000
22   206.428571
23   246.300000
24   233.800000
25   197.454545
26   242.058824
27   228.125000
28   209.100000
29   215.916667
30   235.818182
31   267.250000
32   207.909091
33   325.333333
34   245.500000
35   211.625000
36   246.600000
37   249.333333
38   193.533333
39   169.285714
40   250.823529
41   268.555556
42   303.500000
43   230.571429
44   282.166667
45   309.538462
46   223.692308
47   233.222222
48   281.400000
49   184.666667
50   254.600000
51   302.142857
52   340.636364
53   222.857143
54   278.076923
55   295.538462
56   306.666667
57   258.833333
58   116.545455
59   220.000000
60   202.714286
61   256.222222
62   220.769231
63   384.000000
64   281.333333
65   298.200000
66   276.444444
67   214.625000
68   269.600000
69   235.200000
Wall time: 16 ms


### pandas 속도도 꽤 좋은데..? 데이터가 적어서 그런가.. 의문점..!