In [135]:
## imports:
## SparkSession:   When you start pyspark you get a SparkSession object called spark by default. 
##   In a standalone Python application, you need to create your SparkSession object explicitly.
##   When you create a SparkSession, automaticaly creates a SparkContext
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

sc = SparkContext.getOrCreate()
sc

In [136]:
## definições
MASTER='local'
IP = '127.0.0.1'
DATABASE = 'm3bg1'
COLLECTIONS = ['c_address','city','customer','date','lineorder','nation','part','region','s_address','supplier']
MONGO_FORMAT = 'com.mongodb.spark.sql.DefaultSource'

DB_URL = "mongodb://{0}/{1}.".format(IP, DATABASE)

In [137]:
def newSession(collection):
    sparkSession = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", DB_URL+collection) \
    .config("spark.mongodb.output.uri", DB_URL+collection) \
    .getOrCreate()
    
    return sparkSession

def newCollection(collection):
    print('get', collection,'...')
    return spark.read.format(MONGO_FORMAT).option("uri",DB_URL+collection).load()

def newTempView(df, name):
    print('new view temp_'+name+'...')
    df.createOrReplaceTempView('temp_'+name)

In [138]:
##df = spark.read.format(SPARK_FORMAT).load()
print('LOAD COLLECTIONS FROM DATABASE...')

## LOAD COLLECTIONS FROM DATABASE
## OBS: MongoDB doesn't supports RDD in Python so needs to use DataFrames
c_address = newCollection('c_address')
city = newCollection('city')
customer = newCollection('customer')
date = newCollection('date')
lineorder = newCollection('lineorder')
nation = newCollection('nation')
part = newCollection('part')
region = newCollection('region')
s_address = newCollection('s_address')
supplier = newCollection('supplier')

LOAD COLLECTIONS FROM DATABASE...
get c_address ...
get city ...
get customer ...
get date ...
get lineorder ...
get nation ...
get part ...
get region ...
get s_address ...
get supplier ...


In [132]:
rdd = c_address.rdd

In [133]:
rdd

MapPartitionsRDD[484] at javaToPython at <unknown>:0

In [140]:
c_address.filter(c_address['c_address_geo.type'] == 'Point').show()

+--------------------+--------------------+------------+
|                 _id|       c_address_geo|c_address_pk|
+--------------------+--------------------+------------+
|[5b23444e70fbd137...|[Point, [-95.3267...|           0|
|[5b23444e70fbd137...|[Point, [-95.3929...|           1|
|[5b23444e70fbd137...|[Point, [-95.5152...|           2|
|[5b23444e70fbd137...|[Point, [-95.6069...|           3|
|[5b23444e70fbd137...|[Point, [-95.5922...|           4|
|[5b23444e70fbd137...|[Point, [-95.4752...|           5|
|[5b23444e70fbd137...|[Point, [-95.3928...|           6|
|[5b23444e70fbd137...|[Point, [-95.4733...|           7|
|[5b23444e70fbd137...|[Point, [-95.4203...|           8|
|[5b23444e70fbd137...|[Point, [-95.6829...|           9|
|[5b23444e70fbd137...|[Point, [-95.4375...|          10|
|[5b23444e70fbd137...|[Point, [-95.5099...|          11|
|[5b23444e70fbd137...|[Point, [-95.3525...|          12|
|[5b23444e70fbd137...|[Point, [-95.5186...|          13|
|[5b23444e70fbd137...|[Point, [

In [121]:
## TEMPORARIES VIEWS
newTempView(c_address, 'c_address')
newTempView(city, 'city')
newTempView(customer, 'customer')
newTempView(date, 'date')
newTempView(lineorder, 'lineorder')
newTempView(nation, 'nation')
newTempView(part, 'part')
newTempView(region, 'region')
newTempView(s_address, 's_address')
newTempView(supplier, 'supplier')

new view temp_c_address...
new view temp_city...
new view temp_customer...
new view temp_date...
new view temp_lineorder...
new view temp_nation...
new view temp_part...
new view temp_region...
new view temp_s_address...
new view temp_supplier...


In [127]:
res = spark.sql("select * from temp_c_address where c_address_geo like '[Point%'")

AnalysisException: "cannot resolve 'temp_c_address.`c_address_geo` LIKE '[Point%'' due to data type mismatch: argument 1 requires string type, however, 'temp_c_address.`c_address_geo`' is of struct<type:string,coordinates:array<double>> type.; line 1 pos 49;\n'Project [*]\n+- 'Filter c_address_geo#1103 LIKE [Point%\n   +- SubqueryAlias `temp_c_address`\n      +- Relation[_id#1102,c_address_geo#1103,c_address_pk#1104L] MongoRelation(MongoRDD[405] at RDD at MongoRDD.scala:51,Some(StructType(StructField(_id,StructType(StructField(oid,StringType,true)),true), StructField(c_address_geo,StructType(StructField(type,StringType,true), StructField(coordinates,ArrayType(DoubleType,true),true)),true), StructField(c_address_pk,LongType,true))))\n"

In [126]:
res.show()

+--------------------+--------------------+------------+
|                 _id|       c_address_geo|c_address_pk|
+--------------------+--------------------+------------+
|[5b23444e70fbd137...|[Point, [-95.3267...|           0|
|[5b23444e70fbd137...|[Point, [-95.3929...|           1|
|[5b23444e70fbd137...|[Point, [-95.5152...|           2|
|[5b23444e70fbd137...|[Point, [-95.6069...|           3|
|[5b23444e70fbd137...|[Point, [-95.5922...|           4|
|[5b23444e70fbd137...|[Point, [-95.4752...|           5|
|[5b23444e70fbd137...|[Point, [-95.3928...|           6|
|[5b23444e70fbd137...|[Point, [-95.4733...|           7|
|[5b23444e70fbd137...|[Point, [-95.4203...|           8|
|[5b23444e70fbd137...|[Point, [-95.6829...|           9|
|[5b23444e70fbd137...|[Point, [-95.4375...|          10|
|[5b23444e70fbd137...|[Point, [-95.5099...|          11|
|[5b23444e70fbd137...|[Point, [-95.3525...|          12|
|[5b23444e70fbd137...|[Point, [-95.5186...|          13|
|[5b23444e70fbd137...|[Point, [