In [1]:
import os
from pyspark import SparkContext
import json
from pprint import pprint

In [2]:
sc = SparkContext()

In [3]:
cd = os.getcwd()
project_dir = cd.split(os.sep)[0:len(cd.split(os.sep))-1]
input_dir = os.path.join(os.sep.join(project_dir), 'input')
fp_sample_input = os.path.join(input_dir, 'gsod_005.txt')
fp_desc_input = os.path.join(input_dir, 'GSOD_DESC.txt')

### 1. Build up main rdd.

In [4]:
with open(fp_desc_input, 'r') as df_desc:
    for line_n, line in enumerate(df_desc):
        line = df_desc.readline()
        if 'FIELD' in line and 'POSITION' in line and 'TYPE' in line and 'DESCRIPTION' in line:
            break
    data = df_desc.readlines()
    json_fields = {}
    for line in data:
        if len(json_fields.keys()) == 26:
            break
        elif line.split() != [] and line.split(' ')[0] != '':
            field_name = line.split()[0]
            field_type = line.split()[2]
            if str.upper(field_name) != field_name:
                field_name += "_" + list(json_fields.values())[-1][0]
            json_fields[line.split()[1]] = [field_name, field_type] 
        else:
            pass
pprint(json_fields)

{'1-6': ['STN---', 'Int.'],
 '103-108': ['MAX', 'Real'],
 '109-109': ['Flag_MAX', 'Char'],
 '111-116': ['MIN', 'Real'],
 '117-117': ['Flag_MIN', 'Char'],
 '119-123': ['PRCP', 'Real'],
 '124-124': ['Flag_PRCP', 'Char'],
 '126-130': ['SNDP', 'Real'],
 '133-138': ['FRSHTT', 'Int.'],
 '15-18': ['YEAR', 'Int.'],
 '19-22': ['MODA', 'Int.'],
 '25-30': ['TEMP', 'Real'],
 '32-33': ['Count_TEMP', 'Int.'],
 '36-41': ['DEWP', 'Real'],
 '43-44': ['Count_DEWP', 'Int.'],
 '47-52': ['SLP', 'Real'],
 '54-55': ['Count_SLP', 'Int.'],
 '58-63': ['STP', 'Real'],
 '65-66': ['Count_STP', 'Int.'],
 '69-73': ['VISIB', 'Real'],
 '75-76': ['Count_VISIB', 'Int.'],
 '79-83': ['WDSP', 'Real'],
 '8-12': ['WBAN', 'Int.'],
 '85-86': ['Count_WDSP', 'Int.'],
 '89-93': ['MXSPD', 'Real'],
 '96-100': ['GUST', 'Real']}


In [5]:
with open(fp_sample_input, 'r') as f:
    line = f.readline()
print(line)

030050 99999  19300101    41.4  4    39.7  4  9999.9  0  9999.9  0    7.8  4    6.0  4    8.9  999.9    45.0*   37.0  99.99  999.9  010000



In [6]:
def mapper(line, json_fields):
    rdd_line = []
    for key in json_fields:
        beg = int(key.split('-')[0])-1
        end = int(key.split('-')[1])
        # Convert fields to respective types
        field_type = json_fields[key][1]
        rdd_line = rdd_line + [int(line[beg:end]) if field_type == 'Int.' else (float(line[beg:end]) if field_type == 'Real' else line[beg:end])]
    return tuple(rdd_line)

pprint(mapper(line, json_fields))

(30050,
 99999,
 1930,
 101,
 41.4,
 4,
 39.7,
 4,
 9999.9,
 0,
 9999.9,
 0,
 7.8,
 4,
 6.0,
 4,
 8.9,
 999.9,
 45.0,
 '*',
 37.0,
 ' ',
 99.99,
 ' ',
 999.9,
 10000)


In [7]:
rdd_base = sc.textFile(fp_sample_input)
rdd_base.take(1)

['030050 99999  19300101    41.4  4    39.7  4  9999.9  0  9999.9  0    7.8  4    6.0  4    8.9  999.9    45.0*   37.0  99.99  999.9  010000']

In [8]:
rdd = rdd_base.map(lambda x: mapper(x,json_fields))
pprint(rdd.take(2))

[(30050,
  99999,
  1930,
  101,
  41.4,
  4,
  39.7,
  4,
  9999.9,
  0,
  9999.9,
  0,
  7.8,
  4,
  6.0,
  4,
  8.9,
  999.9,
  45.0,
  '*',
  37.0,
  ' ',
  99.99,
  ' ',
  999.9,
  10000),
 (30050,
  99999,
  1930,
  118,
  47.3,
  4,
  44.8,
  4,
  1004.0,
  4,
  9999.9,
  0,
  9.3,
  4,
  11.0,
  4,
  13.0,
  999.9,
  48.0,
  '*',
  46.0,
  ' ',
  0.0,
  'I',
  999.9,
  0)]


In [9]:
from operator import add

keys_repetition = rdd.distinct().keys().map(lambda x: (x, 1)).reduceByKey(add)

In [10]:
keys_repetition.take(5)

KeyboardInterrupt: 

### Clean up rdd based on field descriptions.

In [13]:
sc

In [14]:
def GroupByKey_1(fp_sample_input):
    rdd_base = sc.textFile(fp_sample_input)
    rdd_mapped = rdd_base.map(lambda x: mapper(x,json_fields))
    print('-------------------------------------------------------------')
    rdd_mapped = rdd_mapped.map(lambda x: (x, 1))
    rdd_group = rdd_mapped.groupByKey().map(lambda x:(x[0], len(x[1])))
    print(f'El número de resgistros por estación es: {rdd_group.take(1)}')

In [15]:
GroupByKey_1(fp_sample_input)

-------------------------------------------------------------
El número de resgistros por estación es: [((31590, 99999, 1930, 523, 49.2, 4, 45.8, 4, 1026.8, 4, 9999.9, 0, 12.4, 4, 2.7, 4, 5.1, 999.9, 54.0, ' ', 45.0, ' ', 0.0, 'I', 999.9, 0), 1)]


In [None]:
rdd1.explain()