In [1]:
import numpy as np
import sympy as sy

import pyspark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

sql = SparkSession.builder \
    .master("local") \
    .appName("euler 27") \
    .getOrCreate()

In [10]:
b = sy.primerange(1, 1000)
b = list(b)

b = sql.createDataFrame(b, IntegerType())

b.show(5)

+-----+
|value|
+-----+
|    2|
|    3|
|    5|
|    7|
|   11|
+-----+
only showing top 5 rows



In [11]:
a = sql.range(-1000, 1000)
a.show(5)

+-----+
|   id|
+-----+
|-1000|
| -999|
| -998|
| -997|
| -996|
+-----+
only showing top 5 rows



In [12]:
a.registerTempTable("table_a")
b.registerTempTable("table_b")

In [27]:
query = """
select
    a.id as A,
    b.value as B
from table_a as a
cross join table_b as b
"""

df = sql.sql(query)
df.count()

336000

In [23]:
def count_primes_impl(a, b):
    f = lambda x: x**2 + a*x + b
    n = 0
    
    while sy.isprime(f(n)):
        n += 1
    
    return n - 1

count_primes = F.udf(count_primes_impl, IntegerType())

In [28]:
df = df.withColumn("product", df["A"] * df["B"])
df = df.withColumn("prime_seq_len", count_primes("A", "B"))
df.show(5)

+-----+---+-------+-------------+
|    A|  B|product|prime_seq_len|
+-----+---+-------+-------------+
|-1000|  2|  -2000|            0|
| -999|  2|  -1998|            0|
| -998|  2|  -1996|            0|
| -997|  2|  -1994|            0|
| -996|  2|  -1992|            0|
+-----+---+-------+-------------+
only showing top 5 rows



In [26]:
df.orderBy("prime_seq_len", ascending=False).show(10)

+---+---+-------------+
|  A|  B|prime_seq_len|
+---+---+-------------+
|-61|971|           70|
|-59|911|           69|
|-57|853|           68|
|-55|797|           67|
|-53|743|           66|
|-51|691|           65|
|-49|641|           64|
|-47|593|           63|
|-45|547|           62|
|-43|503|           61|
+---+---+-------------+
only showing top 10 rows

