In [1]:
from graphframes import *
from pyspark.sql.functions import *

In [2]:
# Vertics DataFrame
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

g.vertices.show()
g.edges.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 37|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 38|
|  g|  Gabby| 60|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
|  g|  e|      follow|
+---+---+------------+



In [3]:
# Q1: Find Alice's two-hop neighbors' names, regardless of the edge type
motifs = g.find("(a)-[]->(b); (b)-[]->(c)").filter('a.name = "Alice"').select("c.name")
motifs.show()

+-------+
|   name|
+-------+
|  Fanny|
|  David|
|Charlie|
+-------+



In [6]:
# Q2: Redo the previous question, but exclude Alice's two-hop neighbors who have an edge back to Alice.
motifs = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(d)").filter('a.name = "Alice" and d.name != "Alice"').select("c.name")
motifs.show()

+-------+
|   name|
+-------+
|  Fanny|
|Charlie|
+-------+



In [15]:
# Q3: Find all people who follow Charlie
motifs = g.find("(a)-[e]->(b)").filter('b.name = "Charlie" and e.relationship = "follow"').select("a.name")
motifs.show()

+-----+
| name|
+-----+
|Fanny|
|  Bob|
+-----+



In [24]:
# Q4: Find all people who are being followed by at least 2 people
motifs = g.find("(a)-[e]->(b)")\
          .filter('e.relationship = "follow"')\
          .groupBy("b.name").count()\
          .filter('count >= 2')\
          .select('name')
motifs.show()

+-------+
|   name|
+-------+
|Charlie|
+-------+



In [1]:
# Q5: 
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 5)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 8
rdd = sc.textFile('./data/adj_noun_pairs.txt', numPartitions)
rddQueue = rdd.randomSplit([1]*10, 123)
lines = ssc.queueStream(rddQueue)

# FILL IN YOUR CODE
def updateFunc(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)
    # add the new values with the previous running count to get the new count

running_counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      .updateStateByKey(updateFunc)

counts_sorted = running_counts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))

def printResults(rdd):
    print("Total distinct words: ", rdd.count())
    print(rdd.take(5))
    print('refinery:', rdd.lookup('refinery')[0])

counts_sorted.foreachRDD(printResults)

ssc.start()
ssc.awaitTermination(50)
ssc.stop(False)
print("Finished")

Total distinct words:  51430
[('other', 7782), ('first', 5404), ('many', 4878), ('new', 3219), ('system', 2539)]
refinery: 1
Total distinct words:  76917
[('other', 15486), ('first', 10815), ('many', 9773), ('new', 6272), ('system', 5063)]
refinery: 6
Total distinct words:  97338
[('other', 23129), ('first', 16145), ('many', 14534), ('new', 9363), ('system', 7636)]
refinery: 11
Total distinct words:  114786
[('other', 30805), ('first', 21524), ('many', 19348), ('new', 12514), ('system', 10174)]
refinery: 16


KeyboardInterrupt: 

In [1]:
from pyspark.streaming import StreamingContext

spark = SparkSession.builder.appName('pda_inst_monitor_status_update').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 5)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 8
rdd = sc.textFile('./data/adj_noun_pairs.txt', numPartitions)
rddQueue = rdd.randomSplit([1]*10, 123)
lines = ssc.queueStream(rddQueue)

# FILL IN YOUR CODE
# words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
# pairs = words.map(lambda word: (word, 1))
# wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
lines.pprint()
# wordCounts.pprint()

ssc.start()
ssc.awaitTermination(50)
ssc.stop(False)
print("Finished")

-------------------------------------------
Time: 2021-11-14 19:52:15
-------------------------------------------
self-defined anarchist
differ interpretation
social movement
anti-authoritarian society
coercive institution
western philosophy
american society
french revolution
later anarchist
french movement
...

-------------------------------------------
Time: 2021-11-14 19:52:20
-------------------------------------------
french revolution
authoritarian structure
true Levellers
modern era
indian Movement
complete rights
work movement
differ interpretation
many anarchist
succeed Council
...

-------------------------------------------
Time: 2021-11-14 19:52:25
-------------------------------------------
positive label
voluntary association
free society
major text
famous property
more discussion
social institution
first periodical
diverse current
describe anarchism
...

-------------------------------------------
Time: 2021-11-14 19:52:30
-------------------------------------------
pej

KeyboardInterrupt: 

In [1]:
from pyspark.streaming import StreamingContext

spark = SparkSession.builder.appName('pda_inst_monitor_status_update').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 5)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 8
rdd = sc.textFile('./data/adj_noun_pairs.txt', numPartitions)
rddQueue = rdd.randomSplit([1]*10, 123)
lines = ssc.queueStream(rddQueue)

# FILL IN YOUR CODE
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
lines.pprint()
wordCounts.pprint()

ssc.start()
ssc.awaitTermination(50)
ssc.stop(False)
print("Finished")

OSError: [Errno 22] Invalid argument