In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.rdd import portable_hash
from pyspark.statcounter import StatCounter



spark = SparkSession.builder.appName("Taxi")\
        .config("spark.driver.memory", "6g")\
        .config("spark.driver.cores", "8")\
        .getOrCreate()
sc=spark.sparkContext
import random

## Generate Dummy Data
### Generate random number and generate repeated Primary Key to test different cases

In [2]:
x=sc.parallelize(random.sample(range(1,200),10)*100)
print("Primary Key X")
print(x.take(2))
y=sc.parallelize(random.sample(range(1,250000),1000))
print("Secondary Key Y")
print(y.take(2))

Primary Key X
[78, 77]
Secondary Key Y
[101124, 148360]


### Zip Data in one rdd

In [3]:
z=x.zip(y)
z.take(2)

[(78, 101124), (77, 148360)]

### Convert Data to string (if it is already string skip this step)

In [4]:
z_str=z.map(lambda x:(str(x[0]),str(x[1])))
z_str.take(2)

[('78', '101124'), ('77', '148360')]

## Creating One Universal Key That can handle both primary and secoundry keys (This can be applied to N Keys) 
### First get the length of the longest key for x and y

In [5]:
max_len_x=len(z_str.max(key=lambda x:len(x[0]))[0])
print(max_len_x)
max_len_y=len(z_str.max(key=lambda x:len(x[1]))[1])
print(max_len_y)


3
6


### Create The Universal Key, where any key with length less than the max length, I will add extra zeros before it, to let all the keys have the same length
### Then Append X and Y in one Key, all items of the new key shall have the same length

In [6]:
def createKey(x):
    s_1=x[0]
    s_2=x[1]
    for i in range (0,(max_len_x-len(s_1))):
        s_1="0"+s_1
    for i in range (0,(max_len_y-len(s_2))):
        s_2="0"+s_2
    universal_key=s_1+s_2
    return (universal_key,x)

z_str_key=z_str.map(createKey)
z_str_key.take(20)      

[('078101124', ('78', '101124')),
 ('077148360', ('77', '148360')),
 ('100197802', ('100', '197802')),
 ('167189749', ('167', '189749')),
 ('160154136', ('160', '154136')),
 ('045126922', ('45', '126922')),
 ('190163289', ('190', '163289')),
 ('116150731', ('116', '150731')),
 ('085108102', ('85', '108102')),
 ('046020029', ('46', '20029')),
 ('078025515', ('78', '25515')),
 ('077059950', ('77', '59950')),
 ('100113225', ('100', '113225')),
 ('167237630', ('167', '237630')),
 ('160102406', ('160', '102406')),
 ('045037172', ('45', '37172')),
 ('190059679', ('190', '59679')),
 ('116164482', ('116', '164482')),
 ('085192229', ('85', '192229')),
 ('046179977', ('46', '179977'))]

## The Final Result

In [7]:
z_str_sorted=z_str_key.sortByKey()
z_str_sorted.collect()

[('045004202', ('45', '4202')),
 ('045004255', ('45', '4255')),
 ('045004525', ('45', '4525')),
 ('045005521', ('45', '5521')),
 ('045008568', ('45', '8568')),
 ('045008771', ('45', '8771')),
 ('045009654', ('45', '9654')),
 ('045010356', ('45', '10356')),
 ('045012318', ('45', '12318')),
 ('045013858', ('45', '13858')),
 ('045015333', ('45', '15333')),
 ('045015552', ('45', '15552')),
 ('045015906', ('45', '15906')),
 ('045016548', ('45', '16548')),
 ('045020656', ('45', '20656')),
 ('045021359', ('45', '21359')),
 ('045026654', ('45', '26654')),
 ('045029050', ('45', '29050')),
 ('045029698', ('45', '29698')),
 ('045032099', ('45', '32099')),
 ('045033294', ('45', '33294')),
 ('045037172', ('45', '37172')),
 ('045044643', ('45', '44643')),
 ('045045402', ('45', '45402')),
 ('045047404', ('45', '47404')),
 ('045051839', ('45', '51839')),
 ('045055886', ('45', '55886')),
 ('045056325', ('45', '56325')),
 ('045056406', ('45', '56406')),
 ('045058134', ('45', '58134')),
 ('045062677', ('