In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import py2neo
from pymongo import MongoClient
import pandas as pd

In [2]:
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext

In [3]:
graph = py2neo.Graph("bolt://host.docker.internal:7687", auth=("italo", "italo"))

query = "MATCH (mv)<-[:ACTED_IN]-(th:Person{name:\"Tom Hanks\"})-[:DIRECTED]->(mv:Movie) RETURN mv.title, mv.released "
df_neo = graph.run(query).to_data_frame()
df_neo

Unnamed: 0,mv.title,mv.released
0,That Thing You Do,1996
1,That Thing You Do,1996


In [5]:
# needed for any cluster connection
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
# needed for options -- cluster, timeout, SQL++ (N1QL) query, etc.
from couchbase.options import (ClusterOptions, ClusterTimeoutOptions,
                               QueryOptions)

In [14]:
# Connect options - authentication
auth = PasswordAuthenticator('admin', 'admin123')

# Get a reference to our cluster
# NOTE: For TLS/SSL connection use 'couchbases://<your-ip-address>' instead
cluster = Cluster('couchbase://host.docker.internal', ClusterOptions(auth))

In [15]:
cb = cluster.bucket('travel-sample')
cb_coll = cb.scope("inventory").collection("airline")

In [17]:
result = cb_coll.get("airline_10")
print(result.content_as[str])

{'id': 10, 'type': 'airline', 'name': '40-Mile Air', 'iata': 'Q5', 'icao': 'MLA', 'callsign': 'MILE-AIR', 'country': 'United States'}


In [44]:
sql_query = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country = \'United States\''
row_iter = cluster.query(sql_query)
result = list(row_iter)

In [53]:
df = pd.DataFrame([row['airline'] for row in result])

In [54]:
df_spark2 = spark.createDataFrame(df)
df_spark2.show()

+-----------+-------------+----+----+-----+--------------------+-------+
|   callsign|      country|iata|icao|   id|                name|   type|
+-----------+-------------+----+----+-----+--------------------+-------+
|   MILE-AIR|United States|  Q5| MLA|   10|         40-Mile Air|airline|
|        TXW|United States|  TQ| TXW|10123|         Texas Wings|airline|
|     atifly|United States|  A1| A1F|10226|              Atifly|airline|
|     LOCAIR|United States|  ZQ| LOC|10748|              Locair|airline|
|  SASQUATCH|United States|  K5| SQH|10765|    SeaPort Airlines|airline|
|    ACE AIR|United States|  KO| AER|  109|Alaska Central Ex...|airline|
|     CITRUS|United States|  FL| TRS| 1316|     AirTran Airways|airline|
|       null|United States|  -+| --+|13391|            U.S. Air|airline|
|       null|United States|  WQ| PQW|13633| PanAm World Airways|airline|
|    BEMIDJI|United States|  CH| BMJ| 1442|    Bemidji Airlines|airline|
| BERING AIR|United States|  8E| BRG| 1472|        

In [55]:
df_spark2.select(col('country'), col('callsign')).show()

+-------------+-----------+
|      country|   callsign|
+-------------+-----------+
|United States|   MILE-AIR|
|United States|        TXW|
|United States|     atifly|
|United States|     LOCAIR|
|United States|  SASQUATCH|
|United States|    ACE AIR|
|United States|     CITRUS|
|United States|       null|
|United States|       null|
|United States|    BEMIDJI|
|United States| BERING AIR|
|United States|NIGHT CARGO|
|United States|     AIRMAX|
|United States|   Stallion|
|United States|       null|
|United States|       CAIR|
|United States|       USKY|
|United States|       ERAH|
|United States|       null|
|United States|      Orbit|
+-------------+-----------+
only showing top 20 rows



In [13]:
df_spark = spark.createDataFrame(df_neo)
df_spark.show()

+-----------------+-----------+
|         mv.title|mv.released|
+-----------------+-----------+
|That Thing You Do|       1996|
|That Thing You Do|       1996|
+-----------------+-----------+



In [14]:
df_spark = df_spark.withColumn('add_year', col('`mv.released`') + 1)
df_spark.show()

+-----------------+-----------+--------+
|         mv.title|mv.released|add_year|
+-----------------+-----------+--------+
|That Thing You Do|       1996|    1997|
|That Thing You Do|       1996|    1997|
+-----------------+-----------+--------+



In [15]:
df_spark.groupBy('`mv.title`').sum('add_year').show()

+-----------------+-------------+
|         mv.title|sum(add_year)|
+-----------------+-------------+
|That Thing You Do|         3994|
+-----------------+-------------+

