In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import json

In [2]:
# Spark Constants
APP_NAME = 'assignment1'
MASTER = 'local[*]'

# Input Constants
INPUT_METADATA_FILE = 'tracks.csv'
INPUT_FEATURES_FILE = 'features.csv'

# Data Columns
COLUMN_TRACK_ID = 'track_id'
COLUMN_SUBSET = 'set_subset'

# Application Constants
SUBSET_SMALL_VALUE = 'small'

In [3]:
conf = SparkConf().setAppName(APP_NAME).setMaster(MASTER)
sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession.builder.appName(APP_NAME).master(MASTER).getOrCreate()

23/04/13 13:16:20 WARN Utils: Your hostname, pedro-duarte resolves to a loopback address: 127.0.1.1; using 192.168.32.217 instead (on interface wlp2s0)
23/04/13 13:16:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/13 13:16:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
def try_decoding(content: str):
    try:
        return json.loads(content) if content is not None else None
    except json.JSONDecodeError as e:
        return content
    
def import_dataset(ds: str, header_size: int):
    data = spark.read.csv(ds, quote='"', escape='"', multiLine=True, inferSchema=True).rdd
    headerRow = ['_'.join(header) if idx > 0 else COLUMN_TRACK_ID for idx, header in enumerate(zip(*[list(line.asDict().values()) for line in data.take(header_size)]))]

    headerRows = data.take(header_size+1)
    data = data.filter(lambda row: row not in headerRows)

    data = data \
    .map(lambda row: list(row.asDict().values())) \
    .map(lambda row: {k: try_decoding(v) for k, v in zip(headerRow, row)}) \
    .map(json.dumps)

    return spark.read.json(data)

In [5]:
tracks_ds = import_dataset(INPUT_METADATA_FILE, 2)
tracks_ds.schema

                                                                                

StructType([StructField('album_comments', LongType(), True), StructField('album_date_created', StringType(), True), StructField('album_date_released', StringType(), True), StructField('album_engineer', StringType(), True), StructField('album_favorites', LongType(), True), StructField('album_id', LongType(), True), StructField('album_information', StringType(), True), StructField('album_listens', LongType(), True), StructField('album_producer', StringType(), True), StructField('album_tags', StringType(), True), StructField('album_title', StringType(), True), StructField('album_tracks', LongType(), True), StructField('album_type', StringType(), True), StructField('artist_active_year_begin', StringType(), True), StructField('artist_active_year_end', StringType(), True), StructField('artist_associated_labels', StringType(), True), StructField('artist_bio', StringType(), True), StructField('artist_comments', LongType(), True), StructField('artist_date_created', StringType(), True), StructFi

In [6]:
tracks_ds = tracks_ds.rdd \
  .filter(lambda v: v[COLUMN_SUBSET] == SUBSET_SMALL_VALUE) \
  .map(lambda v: (v[COLUMN_TRACK_ID], v))
tracks_ds.count() # 8000 lines

                                                                                

8000

In [7]:
features_ds = import_dataset(INPUT_FEATURES_FILE, 3).rdd.map(lambda v: (v[COLUMN_TRACK_ID], v))

                                                                                

In [10]:
ds = tracks_ds.join(features_ds)
ds.count() # 8000 linhas

                                                                                

8000