In [1]:
# Set the PySpark environment variables
import os
os.environ['SPARK_HOME'] = "C:\spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [2]:
# Import PySpark
from pyspark.sql import SparkSession

In [3]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySpark-Get-Started") \
    .getOrCreate()

In [4]:
# Test the setup
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

In [3]:
sc

In [4]:
nums = list(range(0, 1000001))
len(nums)

1000001

In [5]:
nums_rdd = sc.parallelize(nums)
nums_rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287

In [6]:
nums_rdd.collect()

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,


In [7]:
nums_rdd.take(5)

[0, 1, 2, 3, 4]

In [8]:
3 ** 2

9

In [9]:
squared_nums_rdd = nums_rdd.map(lambda x: x ** 2)
squared_nums_rdd.take(5)

[0, 1, 4, 9, 16]

In [10]:
len(str(546))

3

In [11]:
pairs = squared_nums_rdd.map(lambda x: (x, len(str(x))))
pairs.take(25)

[(0, 1),
 (1, 1),
 (4, 1),
 (9, 1),
 (16, 2),
 (25, 2),
 (36, 2),
 (49, 2),
 (64, 2),
 (81, 2),
 (100, 3),
 (121, 3),
 (144, 3),
 (169, 3),
 (196, 3),
 (225, 3),
 (256, 3),
 (289, 3),
 (324, 3),
 (361, 3),
 (400, 3),
 (441, 3),
 (484, 3),
 (529, 3),
 (576, 3)]

In [12]:
even_digit_pairs = pairs.filter(lambda x: (x[1] % 2) == 0)
even_digit_pairs.take(25)

[(16, 2),
 (25, 2),
 (36, 2),
 (49, 2),
 (64, 2),
 (81, 2),
 (1024, 4),
 (1089, 4),
 (1156, 4),
 (1225, 4),
 (1296, 4),
 (1369, 4),
 (1444, 4),
 (1521, 4),
 (1600, 4),
 (1681, 4),
 (1764, 4),
 (1849, 4),
 (1936, 4),
 (2025, 4),
 (2116, 4),
 (2209, 4),
 (2304, 4),
 (2401, 4),
 (2500, 4)]

In [13]:
flipped_pairs = even_digit_pairs.map(lambda x: (x[1], x[0]))
flipped_pairs.take(25)

[(2, 16),
 (2, 25),
 (2, 36),
 (2, 49),
 (2, 64),
 (2, 81),
 (4, 1024),
 (4, 1089),
 (4, 1156),
 (4, 1225),
 (4, 1296),
 (4, 1369),
 (4, 1444),
 (4, 1521),
 (4, 1600),
 (4, 1681),
 (4, 1764),
 (4, 1849),
 (4, 1936),
 (4, 2025),
 (4, 2116),
 (4, 2209),
 (4, 2304),
 (4, 2401),
 (4, 2500)]

In [14]:
grouped = flipped_pairs.groupByKey()
grouped.take(25)

[(12, <pyspark.resultiterable.ResultIterable at 0x18d22a593c0>),
 (2, <pyspark.resultiterable.ResultIterable at 0x18d22a59780>),
 (4, <pyspark.resultiterable.ResultIterable at 0x18d22a59cf0>),
 (6, <pyspark.resultiterable.ResultIterable at 0x18d22a5a9b0>),
 (8, <pyspark.resultiterable.ResultIterable at 0x18d22a59d80>),
 (10, <pyspark.resultiterable.ResultIterable at 0x18d22a581f0>)]

In [15]:
grouped = grouped.map(lambda x: (x[0], list(x[1])))
grouped.take(2)

[(12,
  [100000147984,
   100000780441,
   100001412900,
   100002045361,
   100002677824,
   100003310289,
   100003942756,
   100004575225,
   100005207696,
   100005840169,
   100006472644,
   100007105121,
   100007737600,
   100008370081,
   100009002564,
   100009635049,
   100010267536,
   100010900025,
   100011532516,
   100012165009,
   100012797504,
   100013430001,
   100014062500,
   100014695001,
   100015327504,
   100015960009,
   100016592516,
   100017225025,
   100017857536,
   100018490049,
   100019122564,
   100019755081,
   100020387600,
   100021020121,
   100021652644,
   100022285169,
   100022917696,
   100023550225,
   100024182756,
   100024815289,
   100025447824,
   100026080361,
   100026712900,
   100027345441,
   100027977984,
   100028610529,
   100029243076,
   100029875625,
   100030508176,
   100031140729,
   100031773284,
   100032405841,
   100033038400,
   100033670961,
   100034303524,
   100034936089,
   100035568656,
   100036201225,
   10003

In [16]:
averaged = grouped.map(lambda x: (x[0], sum(x[1]) / len(x[1]) ))
averaged.collect()

[(12, 472075391214.1667),
 (2, 45.166666666666664),
 (4, 4675.5),
 (6, 471838.0),
 (8, 47204941.666666664),
 (10, 4720705565.0)]