In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, StringType
from pyspark.sql.functions import col, like, sum, avg, max, min

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

24/02/17 05:55:20 WARN Utils: Your hostname, codespaces-01519a resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/02/17 05:55:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/17 05:55:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
schema = StructType(
    [
        StructField("station", StringType(), True),
        StructField("measure", FloatType(), True)
    ]
)

In [4]:
df = spark.read.options(delimiter = ';', header=False).csv("data/weather_stations.csv", schema=schema)

df.select(col('station')).collect()[0].station

In [5]:
df = df.filter(~(col('station').like('#%')))

df.createOrReplaceTempView("measurements")

query = """
    select
        station, max(measure) as max, min(measure) as min, avg(measure) as avg
    from
        measurements
    where
        station not like '*#*'
    group by
        1
    order by 
        1
"""

%%timeit
res = spark.sql(query)

res.show(10, truncate = False)

spark.sql(""" 
    select count(distinct station)
    from measurements
    where station not like '#%'
""").show()

res = df.groupBy('station').agg(
    max('measure').alias('max'),
    min('measure').alias('min'),
    avg('measure').alias('avg')
).orderBy(col('station'))

from pyspark.sql.functions import expr

res = res.select(expr("station || '=' || round(min,1) || '/' || round(avg,1) || '/' || round(max,1) ||', '").alias('myString'))

res = res.select('myString').collect()

for i in res:
    print(i.myString, end='')

In [6]:
res = df.groupBy('station').agg(
    max('measure').alias('max'),
    min('measure').alias('min'),
    avg('measure').alias('avg')
).orderBy(col('station'))

In [7]:
res.show(5)

                                                                                

+--------+-------+-------+------------------+
| station|    max|    min|               avg|
+--------+-------+-------+------------------+
|A Coruña|43.3667|43.3667|    43.36669921875|
|A Yun Pa|13.3939|13.3939|13.393899917602539|
|Aabenraa|55.0444|55.0444| 55.04439926147461|
|  Aachen|50.7756|50.7756| 50.77560043334961|
|  Aadorf|47.4939|47.4939|47.493900299072266|
+--------+-------+-------+------------------+
only showing top 5 rows



In [8]:
res.rdd.map(lambda x : x.station + '=' + str(round(x.min,2)) + '/' + str(round(x.avg,2)) + '/' + str(round(x.max,2)) ).collect()

                                                                                

['A Coruña=43.37/43.37/43.37',
 'A Yun Pa=13.39/13.39/13.39',
 'Aabenraa=55.04/55.04/55.04',
 'Aachen=50.78/50.78/50.78',
 'Aadorf=47.49/47.49/47.49',
 'Aalborg=57.05/57.05/57.05',
 'Aalen=48.83/48.83/48.83',
 'Aaley=33.8/33.8/33.8',
 'Aalsmeer=52.27/52.27/52.27',
 'Aalst=50.94/50.94/50.94',
 'Aalten=51.93/51.93/51.93',
 'Aarau=47.4/47.4/47.4',
 'Aarhus=56.16/56.16/56.16',
 'Aarschot=50.98/50.98/50.98',
 'Aarsâl=34.18/34.18/34.18',
 'Aartselaar=51.13/51.13/51.13',
 'Aasiaat=68.71/68.71/68.71',
 'Aba=5.12/5.12/5.12',
 'Abadan=38.05/38.05/38.05',
 'Abadiânia=-16.2/-16.2/-16.2',
 'Abadla=31.02/31.02/31.02',
 'Abadou=31.58/31.58/31.58',
 'Abaetetuba=-1.72/-1.72/-1.72',
 'Abaeté=-19.16/-19.16/-19.16',
 'Abaiara=-7.36/-7.36/-7.36',
 'Abaji=8.48/8.48/8.48',
 'Abakaliki=6.32/6.32/6.32',
 'Abakan=53.72/53.72/53.72',
 'Abalessa=22.89/22.89/22.89',
 'Abancay=-13.63/-13.63/-13.63',
 'Abangaritos=10.25/10.25/10.25',
 'Abano Terme=45.36/45.36/45.36',
 'Abarkūh=31.13/31.13/31.13',
 'Abarán=38.2/38.2/

24/02/17 05:55:39 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
