In [32]:
from datetime import timedelta
import math

from pyspark.sql import SparkSession, functions as F
from pyspark.sql import types as T

from prefect import task, flow
from prefect.tasks import task_input_hash

In [34]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('plexure') \
    .getOrCreate()

# Define schema
schema = T.StructType([
    T.StructField("type", T.StringType(), True),
    T.StructField("width", T.DoubleType(), True),
    T.StructField("height", T.DoubleType(), True),
    T.StructField("base", T.DoubleType(), True),
    T.StructField("radius", T.DoubleType(), True)
])

# Read JSON lines file into a DataFrame
df = spark.read.json('dummy_data.jsonl', schema=schema, multiLine=False)

In [35]:
df.show(20)

+---------+-----+------+----+------+
|     type|width|height|base|radius|
+---------+-----+------+----+------+
| triangle| null|   8.0| 2.0|  null|
| rectngle|  9.0|   9.0|null|  null|
|   circle| null|  null|null|  10.0|
| rectngle|  2.0|   6.0|null|  null|
|  polygon| null|  null|null|  null|
|  polygon| null|  null|null|  null|
| rectngle|  5.0|   6.0|null|  null|
| triangle| null|   2.0| 1.0|  null|
|   circle| null|  null|null|  null|
|   circle| null|  null|null|   1.0|
| triangle| null|  10.0| 4.0|  null|
|  polygon| null|  null|null|  null|
| rectngle|  8.0|   8.0|null|  null|
|   circle| null|  null|null|   1.0|
|   circle| null|  null|null|   4.0|
|rectangle| null|  -1.0|null|  null|
| triangle| null|   6.0| 2.0|  null|
|  polygon| null|  null|null|  null|
|  ellipse| null|  null|null|  null|
| rectngle|  2.0|   1.0|null|  null|
+---------+-----+------+----+------+
only showing top 20 rows



##### count  how many different shapes

In [36]:
df.select('type').distinct().count()

7

##### list values for all [type] keys

In [37]:
df.select('type').distinct().show()

+---------+
|     type|
+---------+
|   circle|
| rectngle|
|     null|
| triangle|
|rectangle|
|  ellipse|
|  polygon|
+---------+



##### Calculate average (only meaningful if these fields are numerical):

In [38]:
df.groupBy('type').agg(
    F.avg('width').alias('avg_width'),
    F.avg('height').alias('avg_height'),
    F.avg('base').alias('avg_base'),
    F.avg('radius').alias('avg_radius')
).show()

+---------+-----------------+-----------------+-----------------+-----------------+
|     type|        avg_width|       avg_height|         avg_base|       avg_radius|
+---------+-----------------+-----------------+-----------------+-----------------+
|   circle|             null|             null|             null|5.513812154696133|
| rectngle|5.366279069767442|5.244186046511628|             null|             null|
|     null|             null|             null|             -1.0|             null|
| triangle|             null|5.433333333333334|5.655555555555556|             null|
|rectangle|             null|             -1.0|             null|             null|
|  ellipse|             null|             null|             null|             null|
|  polygon|             null|             null|             null|             null|
+---------+-----------------+-----------------+-----------------+-----------------+



##### Count nulls in each column

In [42]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()


+----+-----+------+----+------+
|type|width|height|base|radius|
+----+-----+------+----+------+
|  22|  828|   624| 798|   819|
+----+-----+------+----+------+



### calculation

In [23]:
df = df.withColumn('area',
                   F.when(F.col('type') == 'rectangle', F.col('width') * F.col('height')) \
                   .when(F.col('type') == 'triangle', 0.5 * F.col('base') * F.col('height')) \
                   .when(F.col('type') == 'circle', math.pi * F.pow(F.col('radius'), 2))
                   )

# Calculate the total area
total_area = df.select(F.sum('area')).first()[0]

In [11]:
print(total_area)

50563.2933425039
