In [6]:
%load_ext autoreload

import warnings
warnings.filterwarnings("ignore") # disable warnings

from os import listdir
from os.path import join
import csv, sys
import dateutil.parser
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import (StringType, DoubleType, TimestampType, NullType, IntegerType, StructType, StructField)

from IPython.core.interactiveshell import InteractiveShell

Config settings

In [14]:
# For IPython

InteractiveShell.ast_node_interactivity = "all" # To show all output after each cell execution (instead of the last output)

# For HDFS
HDFS_DEFAULT = "hdfs://alakazam.fib.upc.es:27000"
HDFS_USER = "bdm"
HDFS_HOME = "/user/{}".format(HDFS_USER)

# For HDFS Path

hdfs_home = "{}{}".format(HDFS_DEFAULT, HDFS_HOME)

# For events
activities_dir = join("data", "events", "activities")
culture_dir = join("data", "events", "culture")
tourist_points_dir = join("data", "events", "tourist_points")

# For specific file
data_date = "20220404"

activities_file = "{}/{}/{}".format(hdfs_home, activities_dir, "activities_{}.parquet".format(data_date))
culture_file = "{}/{}/{}".format(hdfs_home, culture_dir, "culture_{}.parquet".format(data_date))
tourist_points_file = "{}/{}/{}".format(hdfs_home, tourist_points_dir, "tourist_points_{}.parquet".format(data_date))

In [15]:
activities_file
culture_file
tourist_points_file

'hdfs://alakazam.fib.upc.es:27000/user/bdm/data/events/activities/activities_20220404.parquet'

'hdfs://alakazam.fib.upc.es:27000/user/bdm/data/events/culture/culture_20220404.parquet'

'hdfs://alakazam.fib.upc.es:27000/user/bdm/data/events/tourist_points/tourist_points_20220404.parquet'

In [16]:
spark = SparkSession.builder.appName("bdm").master('local').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [17]:
df_activities = spark.read.format("parquet").load(activities_file)
df_culture = spark.read.format("parquet").load(culture_file)
df_tourist_points = spark.read.format("parquet").load(tourist_points_file)

In [21]:
activities_cols = [
    'register_id', 'name', 'geo_epgs_4326_x', 'geo_epgs_4326_y', # Must
    'addresses_neighborhood_id', 'addresses_neighborhood_name', # For neighborhood's query
    'addresses_district_id', 'addresses_district_name', # For district query
    'addresses_road_name', 'addresses_road_id' # Maybe useful to search events on that road
]

In [20]:
df_activities
df_culture
df_tourist_points

DataFrame[addresses_roadtype_name: int, addresses_end_street_number: bigint, values_attribute_name: string, addresses_road_name: string, values_category: string, addresses_zip_code: bigint, secondary_filters_id: bigint, values_value: string, addresses_town: string, geo_epgs_4326_y: double, geo_epgs_4326_x: double, secondary_filters_name: string, secondary_filters_tree: bigint, addresses_district_name: string, geo_epgs_25831_x: double, addresses_start_street_number: bigint, register_id: string, institution_id: bigint, addresses_main_address: boolean, addresses_district_id: bigint, addresses_roadtype_id: int, addresses_type: int, addresses_neighborhood_id: bigint, _id: bigint, name: string, addresses_road_id: bigint, created: timestamp, geo_epgs_25831_y: double, institution_name: string, modified: timestamp, secondary_filters_asia_id: bigint, secondary_filters_fullpath: string, values_description: string, values_id: bigint, addresses_neighborhood_name: string, values_outstanding: boolean

DataFrame[addresses_roadtype_name: timestamp, addresses_end_street_number: string, values_attribute_name: string, addresses_road_name: string, values_category: string, addresses_zip_code: string, secondary_filters_id: string, values_value: string, addresses_town: string, geo_epgs_4326_y: double, geo_epgs_4326_x: double, secondary_filters_name: int, secondary_filters_tree: int, addresses_district_name: string, geo_epgs_25831_x: double, addresses_start_street_number: string, register_id: string, institution_id: string, addresses_main_address: string, addresses_district_id: string, addresses_roadtype_id: timestamp, addresses_type: string, addresses_neighborhood_id: string, _id: bigint, name: string, addresses_road_id: string, created: string, geo_epgs_25831_y: double, institution_name: string, modified: timestamp, secondary_filters_asia_id: int, secondary_filters_fullpath: int, values_description: string, values_id: string, addresses_neighborhood_name: string, values_outstanding: string, 

DataFrame[addresses_roadtype_name: int, addresses_end_street_number: bigint, values_attribute_name: string, addresses_road_name: string, values_category: string, addresses_zip_code: bigint, secondary_filters_id: bigint, values_value: string, addresses_town: string, geo_epgs_4326_y: double, geo_epgs_4326_x: double, secondary_filters_name: string, secondary_filters_tree: bigint, addresses_district_name: string, geo_epgs_25831_x: double, addresses_start_street_number: bigint, register_id: string, institution_id: bigint, addresses_main_address: boolean, addresses_district_id: bigint, addresses_roadtype_id: int, addresses_type: int, addresses_neighborhood_id: bigint, _id: bigint, name: string, addresses_road_id: bigint, created: timestamp, geo_epgs_25831_y: double, institution_name: string, modified: timestamp, secondary_filters_asia_id: bigint, secondary_filters_fullpath: string, values_description: string, values_id: bigint, addresses_neighborhood_name: string, values_outstanding: boolean

#### Helper methods

In [7]:
def csv_to_dataFrame(sqlCtx, rdd, columns=None, sep=",", parseDate=True):
    """
    Converts CSV plain text RDD into SparkSQL DataFrame (former SchemaRDD) using PySpark. If columns not given, assumes first row is the header. If separator not given, assumes comma separated
    """
    py_version = sys.version_info[0]
    if py_version < 3:
        def toRow(line): return toRowSep(line.encode('utf-8'), sep)
    else:
        def toRow(line): return toRowSep(line, sep)

    rdd_array = rdd.map(toRow)
    rdd_sql = rdd_array

    if columns is None:
        columns = rdd_array.first()
        rdd_sql = rdd_array.zipWithIndex().filter(
            lambda r_i: r_i[1] > 0).keys()
    column_types = evaluateType(rdd_sql, parseDate)

    def toSqlRow(row):
        return toSqlRowWithType(row, column_types)

    schema = makeSchema(zip(columns, column_types))

    return sqlCtx.createDataFrame(rdd_sql.map(toSqlRow), schema=schema)


def makeSchema(columns):
    struct_field_map = {'string': StringType(),
                        'date': TimestampType(),
                        'double': DoubleType(),
                        'int': IntegerType(),
                        'none': NullType()}
    fields = [StructField(k, struct_field_map[v], True) for k, v in columns]

    return StructType(fields)


def toRowSep(line, d):
    """Parses one row using csv reader"""
    for r in csv.reader([line], delimiter=d):
        return r


def toSqlRowWithType(row, col_types):
    """Convert to sql.Row"""
    d = row
    for col, data in enumerate(row):
        typed = col_types[col]
        if isNone(data):
            d[col] = None
        elif typed == 'string':
            d[col] = data
        elif typed == 'int':
            d[col] = int(round(float(data)))
        elif typed == 'double':
            d[col] = float(data)
        elif typed == 'date':
            d[col] = toDate(data)
    return d


# Type converter
def isNone(d):
    return (d is None or d == 'None' or
            d == '?' or
            d == '' or
            d == 'NULL' or
            d == 'null')


def toDate(d):
    return dateutil.parser.parse(d)


def getRowType(row):
    """Infers types for each row"""
    d = row
    for col, data in enumerate(row):
        try:
            if isNone(data):
                d[col] = 'none'
            else:
                num = float(data)
                if num.is_integer():
                    d[col] = 'int'
                else:
                    d[col] = 'double'
        except:
            try:
                toDate(data)
                d[col] = 'date'
            except:
                d[col] = 'string'
    return d


def getRowTypeNoDate(row):
    """Infers types for each row"""
    d = row
    for col, data in enumerate(row):
        try:
            if isNone(data):
                d[col] = 'none'
            else:
                num = float(data)
                if num.is_integer():
                    d[col] = 'int'
                else:
                    d[col] = 'double'
        except:
            d[col] = 'string'
    return d


def reduceTypes(a, b):
    """Reduces column types among rows to find common denominator"""
    type_order = {'string': 0, 'date': 1, 'double': 2, 'int': 3, 'none': 4}
    reduce_map = {'int': {0: 'string', 1: 'string', 2: 'double'},
                  'double': {0: 'string', 1: 'string'},
                  'date': {0: 'string'}}
    d = a
    for col, a_type in enumerate(a):
        # a_type = a[col]
        b_type = b[col]
        if a_type == 'none':
            d[col] = b_type
        elif b_type == 'none':
            d[col] = a_type
        else:
            order_a = type_order[a_type]
            order_b = type_order[b_type]
            if order_a == order_b:
                d[col] = a_type
            elif order_a > order_b:
                d[col] = reduce_map[a_type][order_b]
            elif order_a < order_b:
                d[col] = reduce_map[b_type][order_a]
    return d


def evaluateType(rdd_sql, parseDate):
    if parseDate:
        return rdd_sql.map(getRowType).reduce(reduceTypes)
    else:
        return rdd_sql.map(getRowTypeNoDate).reduce(reduceTypes)

In [8]:
df = spark.read.format("parquet").load(activities_file)

                                                                                

In [9]:
df

DataFrame[addresses_roadtype_name: int, addresses_end_street_number: bigint, values_attribute_name: string, addresses_road_name: string, values_category: string, addresses_zip_code: bigint, secondary_filters_id: bigint, values_value: string, addresses_town: string, geo_epgs_4326_y: double, geo_epgs_4326_x: double, secondary_filters_name: string, secondary_filters_tree: bigint, addresses_district_name: string, geo_epgs_25831_x: double, addresses_start_street_number: bigint, register_id: string, institution_id: bigint, addresses_main_address: boolean, addresses_district_id: bigint, addresses_roadtype_id: int, addresses_type: int, addresses_neighborhood_id: bigint, _id: bigint, name: string, addresses_road_id: bigint, created: timestamp, geo_epgs_25831_y: double, institution_name: string, modified: timestamp, secondary_filters_asia_id: bigint, secondary_filters_fullpath: string, values_description: string, values_id: bigint, addresses_neighborhood_name: string, values_outstanding: boolean

In [21]:
#df = spark.read.format("csv").load(activities_file)
# spark.read.csv(activities_file, sep=",", header=0)
df_rdd = sc.textFile(activities_file)
df = csv_to_dataFrame(sqlContext, df_rdd)

In [30]:
_df = df.to_pandas_on_spark()



In [36]:
type(_df)

pyspark.pandas.frame.DataFrame

In [37]:
_df = _df.drop(columns=["secondary_filters_asia_id"], axis=1)

In [40]:
_df['addresses_roadtype_name'].unique()

22/05/31 19:30:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/31 19:30:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/31 19:30:37 ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 15)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(

Py4JJavaError: An error occurred while calling o798.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15) (alakazam.fib.upc.es executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/session.py", line 682, in prepare
    verify_func(obj)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/types.py", line 1411, in verify
    verify_value(obj)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/types.py", line 1392, in verify_struct
    verifier(v)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/types.py", line 1411, in verify
    verify_value(obj)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/types.py", line 1333, in verify_integer
    raise ValueError(
ValueError: field secondary_filters_asia_id: object of IntegerType out of range, got: 65103003001003

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/session.py", line 682, in prepare
    verify_func(obj)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/types.py", line 1411, in verify
    verify_value(obj)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/types.py", line 1392, in verify_struct
    verifier(v)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/types.py", line 1411, in verify
    verify_value(obj)
  File "/home/bdm/miniconda3/envs/bdm/lib/python3.8/site-packages/pyspark/sql/types.py", line 1333, in verify_integer
    raise ValueError(
ValueError: field secondary_filters_asia_id: object of IntegerType out of range, got: 65103003001003

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


In [12]:
df.toPandas().head(2)

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c27,_c28,_c29,_c30,_c31,_c32,_c33,_c34,_c35,_c36
0,addresses_roadtype_name,addresses_end_street_number,values_attribute_name,addresses_road_name,values_category,addresses_zip_code,secondary_filters_id,values_value,addresses_town,geo_epgs_4326_y,...,geo_epgs_25831_y,institution_name,modified,secondary_filters_asia_id,secondary_filters_fullpath,values_description,values_id,addresses_neighborhood_name,values_outstanding,values_attribute_id
1,,,Tel.,Av Estadi,Telèfons,8038,56732071,934255445,BARCELONA,2.146913285556539,...,4579669.4380555395,,2022-03-15T18:19:11.887664,65103003001003,Planol BCN >> Educació >> Ensenyament reglat >...,,166525,el Poble-sec,True,20001


In [23]:
# spark = SparkSession.builder.master("local").appName("Pi").config(sc.getConf()).getOrCreate()
spark = SparkSession.builder.getOrCreate()

AttributeError: 'SparkConf' object has no attribute '_get_object_id'

In [3]:
sc.stop()