# ManjeetPanda_Lab_RTD
## Count number of points in a quadrant in real time
Author: Manjeet Panda<br>
1. Create a stream of 2D points.
2. Create Spark Streaming Context to analyse the streaming points.
3. Apply quadrant rule and identify points in each quadrant.
4. Count the number of points falling into a specific quadrant.

In [1]:
# Import Spark Context and Spark Streaming Context
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [2]:
# Function to apply the qaudrant rule
def get_quadrant(line):
    # Convert the input string into a pair of numbers
    try:
        (x, y) = [float(x) for x in line.split()]
    except:
        print "Invalid input"
        return ('Invalid points', 1)

    # Map the pair of numbers to the right quadrant
    if x > 0 and y > 0:
        quadrant = 'First quadrant'
    elif x < 0 and y > 0:
        quadrant = 'Second quadrant'
    elif x < 0 and y < 0:
        quadrant = 'Third quadrant'
    elif x > 0 and y < 0:
        quadrant = 'Fourth quadrant'
    elif x == 0 and y != 0:
        quadrant = 'Lies on Y axis'
    elif x != 0 and y == 0:
        quadrant = 'Lies on X axis'
    else:
        quadrant = 'Origin'

    # The pair represents the quadrant and the counter increment
    return (quadrant, 1)

In [None]:
#Create spark context object
sc = SparkContext(appName="CoordinateCount")
#Create spark streaming context of streaming duration of 2 seconds
ssc = StreamingContext(sc, 2)

#Checkpoint 
ssc.checkpoint("checkpoint")
#Create Dstreams
lines = ssc.socketTextStream("20.0.41.53", 1960)

# Function that's used to update the state
updateFunction = lambda new_values, running_count: sum(new_values) + (running_count or 0)

# Update all the current counts of number of points in each quadrant
running_counts = lines.map(get_quadrant).updateStateByKey(updateFunction)

# Print the current state
running_counts.pprint()

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()


-------------------------------------------
Time: 2018-04-09 16:43:00
-------------------------------------------
('First quadrant', 1)
('Origin', 1)
('Invalid points', 1)
('Second quadrant', 1)

-------------------------------------------
Time: 2018-04-09 16:43:02
-------------------------------------------
('First quadrant', 1)
('Origin', 1)
('Invalid points', 1)
('Second quadrant', 1)
('Fourth quadrant', 1)

-------------------------------------------
Time: 2018-04-09 16:43:04
-------------------------------------------
('First quadrant', 1)
('Origin', 1)
('Invalid points', 1)
('Second quadrant', 1)
('Fourth quadrant', 1)
('Third quadrant', 1)

-------------------------------------------
Time: 2018-04-09 16:43:06
-------------------------------------------
('First quadrant', 1)
('Origin', 1)
('Invalid points', 1)
('Second quadrant', 1)
('Fourth quadrant', 1)
('Third quadrant', 1)

-------------------------------------------
Time: 2018-04-09 16:43:08
---------------------------------

-------------------------------------------
Time: 2018-04-09 16:44:00
-------------------------------------------
('First quadrant', 5)
('Origin', 1)
('Lies on Y axis', 2)
('Invalid points', 3)
('Second quadrant', 2)
('Fourth quadrant', 2)
('Lies on X axis', 2)
('Third quadrant', 3)

-------------------------------------------
Time: 2018-04-09 16:44:02
-------------------------------------------
('First quadrant', 5)
('Origin', 1)
('Lies on Y axis', 2)
('Invalid points', 3)
('Second quadrant', 2)
('Fourth quadrant', 2)
('Lies on X axis', 2)
('Third quadrant', 3)

-------------------------------------------
Time: 2018-04-09 16:44:04
-------------------------------------------
('First quadrant', 5)
('Origin', 1)
('Lies on Y axis', 2)
('Invalid points', 3)
('Second quadrant', 2)
('Fourth quadrant', 2)
('Lies on X axis', 2)
('Third quadrant', 3)

-------------------------------------------
Time: 2018-04-09 16:44:06
-------------------------------------------
('First quadrant', 5)
('Origin'