Skip to content

Commit 93b0661

Browse files
committed
add spark api
1 parent a692e5c commit 93b0661

File tree

6 files changed

+250
-1
lines changed

6 files changed

+250
-1
lines changed

misc/SPARK.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Spark Tinkering
2+
3+
To run Spark + Jupyter container. Then go to [http://localhost:8888](http://localhost:8888).
4+
5+
```bash
6+
docker run -it \
7+
-p 9870:9870 \
8+
-p 8088:8088 \
9+
-p 8080:8080 \
10+
-p 18080:18080 \
11+
-p 9000:9000 \
12+
-p 8888:8888 \
13+
-p 9864:9864 \
14+
-v $HOME/git/py-pair/misc/ipynb:/root/ipynb \
15+
-e PYSPARK_MASTER=spark://localhost:7077 \
16+
-e NOTEBOOK_PASSWORD='' \
17+
oneoffcoder/spark-jupyter
18+
```

misc/ipynb/binary-binary.ipynb

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 10,
6+
"metadata": {},
7+
"outputs": [],
8+
"source": [
9+
"import pandas as pd\n",
10+
"\n",
11+
"get_data = lambda x, y, n: [(x, y) * 2 for _ in range(n)]\n",
12+
"data = get_data(1, 1, 207) + get_data(1, 0, 282) + get_data(0, 1, 231) + get_data(0, 0, 242)\n",
13+
"pdf = pd.DataFrame(data, columns=['x1', 'x2', 'x3', 'x4'])\n",
14+
"sdf = sqlContext.createDataFrame(pdf)"
15+
]
16+
},
17+
{
18+
"cell_type": "code",
19+
"execution_count": 13,
20+
"metadata": {},
21+
"outputs": [
22+
{
23+
"name": "stdout",
24+
"output_type": "stream",
25+
"text": [
26+
"root\n",
27+
" |-- x1: long (nullable = true)\n",
28+
" |-- x2: long (nullable = true)\n",
29+
" |-- x3: long (nullable = true)\n",
30+
" |-- x4: long (nullable = true)\n",
31+
"\n"
32+
]
33+
}
34+
],
35+
"source": [
36+
"sdf.printSchema()"
37+
]
38+
},
39+
{
40+
"cell_type": "code",
41+
"execution_count": 29,
42+
"metadata": {},
43+
"outputs": [],
44+
"source": [
45+
"from itertools import combinations\n",
46+
"\n",
47+
"def to_counts(d):\n",
48+
" def as_key(k1, k2):\n",
49+
" keys = sorted([k1, k2])\n",
50+
" return keys[0], keys[1]\n",
51+
" \n",
52+
" def as_count(v1, v2):\n",
53+
" a, b, c, d = 0, 0, 0, 0\n",
54+
" if v1 is not None and v2 is not None:\n",
55+
" if v1 == 1 and v2 == 1:\n",
56+
" a = 1\n",
57+
" elif v1 == 1 and v2 == 0:\n",
58+
" b = 1\n",
59+
" elif v1 == 0 and v2 == 1:\n",
60+
" c = 1\n",
61+
" else:\n",
62+
" d = 1\n",
63+
" return a, b, c, d\n",
64+
" \n",
65+
" def transform(k1, k2):\n",
66+
" v1, v2 = d[k1], d[k2]\n",
67+
" return as_key(k1, k2), as_count(v1, v2)\n",
68+
" \n",
69+
" return [transform(k1, k2) for k1, k2 in combinations(d.keys(), 2)]\n",
70+
"\n",
71+
"def add_counts(a, b):\n",
72+
" return a[0] + b[0], a[1] + b[1], a[2] + b[2], a[3] + b[3]\n",
73+
"\n",
74+
"# to_counts({'x1': 1, 'x2': 1, 'x3': 1, 'x4': 1})"
75+
]
76+
},
77+
{
78+
"cell_type": "code",
79+
"execution_count": 31,
80+
"metadata": {},
81+
"outputs": [
82+
{
83+
"data": {
84+
"text/plain": [
85+
"[(('x1', 'x2'), (207, 282, 231, 242)),\n",
86+
" (('x1', 'x3'), (489, 0, 0, 473)),\n",
87+
" (('x1', 'x4'), (207, 282, 231, 242)),\n",
88+
" (('x2', 'x3'), (207, 231, 282, 242)),\n",
89+
" (('x2', 'x4'), (438, 0, 0, 524)),\n",
90+
" (('x3', 'x4'), (207, 282, 231, 242))]"
91+
]
92+
},
93+
"execution_count": 31,
94+
"metadata": {},
95+
"output_type": "execute_result"
96+
}
97+
],
98+
"source": [
99+
"sdf.rdd\\\n",
100+
" .flatMap(lambda r: to_counts(r.asDict()))\\\n",
101+
" .reduceByKey(lambda a, b: add_counts(a, b))\\\n",
102+
" .sortByKey()\\\n",
103+
" .collect()"
104+
]
105+
},
106+
{
107+
"cell_type": "code",
108+
"execution_count": null,
109+
"metadata": {},
110+
"outputs": [],
111+
"source": []
112+
}
113+
],
114+
"metadata": {
115+
"kernelspec": {
116+
"display_name": "Python 3",
117+
"language": "python",
118+
"name": "python3"
119+
},
120+
"language_info": {
121+
"codemirror_mode": {
122+
"name": "ipython",
123+
"version": 3
124+
},
125+
"file_extension": ".py",
126+
"mimetype": "text/x-python",
127+
"name": "python",
128+
"nbconvert_exporter": "python",
129+
"pygments_lexer": "ipython3",
130+
"version": "3.7.6"
131+
}
132+
},
133+
"nbformat": 4,
134+
"nbformat_minor": 4
135+
}

pypair/spark.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from itertools import combinations
2+
3+
from pypair.contigency import BinaryMeasures
4+
5+
6+
def binary_binary(sdf):
7+
def to_counts(d):
8+
def as_key(k1, k2):
9+
keys = sorted([k1, k2])
10+
return keys[0], keys[1]
11+
12+
def as_count(v1, v2):
13+
a, b, c, d = 0, 0, 0, 0
14+
if v1 is not None and v2 is not None:
15+
if v1 == 1 and v2 == 1:
16+
a = 1
17+
elif v1 == 1 and v2 == 0:
18+
b = 1
19+
elif v1 == 0 and v2 == 1:
20+
c = 1
21+
else:
22+
d = 1
23+
return a, b, c, d
24+
25+
def transform(k1, k2):
26+
v1, v2 = d[k1], d[k2]
27+
return as_key(k1, k2), as_count(v1, v2)
28+
29+
return [transform(k1, k2) for k1, k2 in combinations(d.keys(), 2)]
30+
31+
def add_counts(a, b):
32+
return a[0] + b[0], a[1] + b[1], a[2] + b[2], a[3] + b[3]
33+
34+
def to_results(counts):
35+
(x1, x2), (a, b, c, d) = counts
36+
a = max(1, a)
37+
b = max(1, b)
38+
c = max(1, c)
39+
d = max(1, d)
40+
computer = BinaryMeasures(a, b, c, d)
41+
measures = {m: computer.get(m) for m in computer.measures()}
42+
return (x1, x2), measures
43+
44+
results = sdf.rdd \
45+
.flatMap(lambda r: to_counts(r.asDict())) \
46+
.reduceByKey(lambda a, b: add_counts(a, b)) \
47+
.sortByKey() \
48+
.map(lambda counts: to_results(counts)) \
49+
.collect()
50+
return {tup[0]: tup[1] for tup in results}

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ numpy
1010
scipy
1111
pandas
1212
scikit-learn
13+
pyspark
1314
# DOCUMENTATION
1415
sphinx
1516
sphinx_rtd_theme

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
long_description_content_type='text/markdown',
1515
url='https://github.com/oneoffcoder/py-pair',
1616
keywords=' '.join(['statistics', 'pairwise', 'association', 'correlation', 'measurement', 'strength']),
17-
install_requires=['scipy', 'numpy', 'pandas', 'scikit-learn'],
17+
install_requires=['scipy', 'numpy', 'pandas', 'scikit-learn', 'pyspark'],
1818
classifiers=[
1919
'Programming Language :: Python :: 3',
2020
'License :: OSI Approved :: Apache Software License',

tests/test_spark.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import logging
2+
import unittest
3+
4+
import pandas as pd
5+
from pyspark.sql import SparkSession
6+
7+
from pypair.spark import binary_binary
8+
9+
10+
class PySparkTest(unittest.TestCase):
11+
@classmethod
12+
def supress_py4j_logging(cls):
13+
logger = logging.getLogger('py4j')
14+
logger.setLevel(logging.WARN)
15+
16+
@classmethod
17+
def create_pyspark_session(cls):
18+
return (SparkSession.builder
19+
.master('local[4]')
20+
.appName('local-testing-pyspark')
21+
.getOrCreate())
22+
23+
@classmethod
24+
def setUpClass(cls):
25+
cls.supress_py4j_logging()
26+
cls.spark = cls.create_pyspark_session()
27+
28+
@classmethod
29+
def tearDownClass(cls):
30+
cls.spark.stop()
31+
32+
def get_binary_binary_data(self):
33+
get_data = lambda x, y, n: [(x, y) * 2 for _ in range(n)]
34+
data = get_data(1, 1, 207) + get_data(1, 0, 282) + get_data(0, 1, 231) + get_data(0, 0, 242)
35+
pdf = pd.DataFrame(data, columns=['x1', 'x2', 'x3', 'x4'])
36+
sdf = self.spark.createDataFrame(pdf)
37+
return sdf
38+
39+
40+
class BinaryBinaryTest(PySparkTest):
41+
def test(self):
42+
sdf = self.get_binary_binary_data()
43+
result = binary_binary(sdf)
44+
import json
45+
print(json.dumps({f'{k[0]}_{k[1]}': v for k, v in result.items()}, indent=1))

0 commit comments

Comments
 (0)