In [2]:
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
print(r.ping())  # True means Redis is connected

True


In [3]:
r.set("name", "Nabeel")
print(r.get("name").decode("utf-8"))  # Output: Nabeel

Nabeel


In [4]:
r.set("counter", 10)
r.incr("counter")
print(r.get("counter"))  # 11

r.decr("counter", amount=2)
print(r.get("counter"))  # 9


b'11'
b'9'


In [4]:
#r.delete("name")


In [5]:
#hash
r.hset("user:1", mapping={"name": "Ali", "age": 25})
print(r.hgetall("user:1"))

{b'name': b'Ali', b'age': b'25'}


In [6]:
#list
r.rpush("fruits", "apple", "banana", "cherry")
print(r.lrange("fruits", 0, -1))  # all elements

[b'apple', b'banana', b'cherry', b'apple', b'banana', b'cherry']


In [8]:
#set
r.sadd("colors", "red", "blue", "green")
print(r.smembers("colors"))

{b'green', b'blue', b'red'}


In [9]:
#Sorted Sets
r.zadd("scores", {"Ali": 100, "Zara": 150})
print(r.zrange("scores", 0, -1, withscores=True))


[(b'Ali', 100.0), (b'Zara', 150.0)]


In [10]:
#TTL
r.set("temp_key", "data", ex=10)  # expires after 10 seconds
print(r.ttl("temp_key"))  # shows remaining time

10


In [None]:
#Publisher
r.publish("channel1", "Hello Redis!")

#SUbscriber
pubsub = r.pubsub()
pubsub.subscribe("channel1")

for message in pubsub.listen():
    print(message)


{'type': 'subscribe', 'pattern': None, 'channel': b'channel1', 'data': 1}


In [7]:
#Transaction
pipe = r.pipeline()
pipe.set("a", 1)
pipe.incr("a")
pipe.execute()


[True, 2]

In [8]:
#pipline batch operation
pipe = r.pipeline()
pipe.set("key1", "value1")
pipe.set("key2", "value2")
pipe.get("key1")
results = pipe.execute()
print(results)

[True, True, b'value1']


In [9]:
#cache
def expensive_function():
    return "Expensive Result"

cache_key = "expensive_result"
if not r.exists(cache_key):
    result = expensive_function()
    r.set(cache_key, result, ex=60)  # cache for 60 seconds
else:
    result = r.get(cache_key).decode()
print(result)


Expensive Result


In [10]:
import uuid

def store_user_session(user_id):
    """
    Generates a unique session token (UUID) for a user and stores it in Redis.
    The session is stored under the key pattern: user:{user_id}:session
    """
    session_key = f"user:{user_id}:session"
    token = str(uuid.uuid4())   # Generate a random UUID as a session token
    r.set(session_key, token)   # Store token in Redis
    return token

def get_user_session(user_id):
    """
    Retrieves the stored session token for the given user_id.
    Returns None if the session does not exist or is expired.
    """
    session_key = f"user:{user_id}:session"
    token = r.get(session_key)
    return token.decode('utf-8') if token else None

def delete_user_session(user_id):
    """
    Deletes the session entry from Redis for the specified user_id.
    """
    session_key = f"user:{user_id}:session"
    r.delete(session_key)

# Usage demonstration
session_token = store_user_session(1001)
print(f"Stored session token: {session_token}")

retrieved_token = get_user_session(1001)
print(f"Retrieved session token: {retrieved_token}")

delete_user_session(1001)
print(f"Session after delete: {get_user_session(1001)}")  # Should be None or empty

Stored session token: 23f043bc-1e10-4ff3-b33f-e60f9c71966f
Retrieved session token: 23f043bc-1e10-4ff3-b33f-e60f9c71966f
Session after delete: None


In [11]:
# LPUSH: Push an element to the head (left) of the list
r.lpush("task_queue", "task1")

# RPUSH: Push an element to the tail (right) of the list
r.rpush("task_queue", "task2")
r.rpush("task_queue", "task3")

# LPOP: Pop (remove and return) the element at the head
task = r.lpop("task_queue")
print(task)  # b'task1'

# Optional: RPOP removes and returns the element at the tail
task = r.rpop("task_queue")
print(task)  # b'task3'

b'task1'
b'task3'


In [12]:
# HSET: Store 'name' and 'email' fields for a user hash key
r.hset("user:1001", "name", "Alice")
r.hset("user:1001", "email", "alice@example.com")

# HGET: Retrieve a single field from the hash
email = r.hget("user:1001", "email")
print(email.decode('utf-8'))  # alice@example.com

# HDEL: Remove a field from the hash
r.hdel("user:1001", "email")

alice@example.com


1

In [13]:
def create_user_profile(user_id, name, email):
    """
    Creates a user profile in Redis under the key 'user:{user_id}'.
    'name' and 'email' are stored as separate fields in the hash.
    """
    user_key = f"user:{user_id}"
    r.hset(user_key, mapping={"name": name, "email": email})

def get_user_profile(user_id):
    """
    Retrieves and returns all fields in the user profile hash
    as a Python dictionary. Keys and values are decoded from bytes.
    """
    user_key = f"user:{user_id}"
    profile_data = r.hgetall(user_key)
    return {k.decode('utf-8'): v.decode('utf-8') for k, v in profile_data.items()}

def delete_user_profile(user_id):
    """
    Deletes the entire user profile key from Redis.
    """
    user_key = f"user:{user_id}"
    r.delete(user_key)

# Usage demonstration
create_user_profile(1002, "Bob", "bob@example.com")
print(get_user_profile(1002))  # e.g. {'name': 'Bob', 'email': 'bob@example.com'}
delete_user_profile(1002)

{'name': 'Bob', 'email': 'bob@example.com'}


In [14]:
# SADD: Add multiple members to a set
r.sadd("tags:python", "redis", "windows", "backend")

# SMEMBERS: Retrieve all unique members in the set
tags = r.smembers("tags:python")
print(tags)  # {b'redis', b'windows', b'backend'}

# ZADD: Add members with scores
r.zadd("leaderboard", {"player1": 10, "player2": 20})

# ZRANGE: Retrieve members in ascending order of score
leaders = r.zrange("leaderboard", 0, -1, withscores=True)
print(leaders)  # [(b'player1', 10.0), (b'player2', 20.0)]

{b'windows', b'redis', b'backend'}
[(b'player1', 10.0), (b'player2', 20.0)]


In [15]:
def add_tag(post_id, tag):
    """
    Adds a 'tag' to the set of tags belonging to a specific post.
    Each post has its own set under 'post:{post_id}:tags'.
    """
    r.sadd(f"post:{post_id}:tags", tag)

def get_tags(post_id):
    """
    Retrieves all tags for a specific post, decoding the bytes into strings.
    """
    raw_tags = r.smembers(f"post:{post_id}:tags")
    return {tag.decode('utf-8') for tag in raw_tags}

def update_leaderboard(player, score):
    """
    Updates or inserts a player's score in the 'game:leaderboard' sorted set.
    A higher score indicates a better position if sorting descending.
    """
    r.zadd("game:leaderboard", {player: score})

def get_leaderboard():
    """
    Returns an ascending list of (player, score) tuples from the leaderboard.
    To invert the sorting (highest first), you'd use ZREVRANGE.
    """
    entries = r.zrange("game:leaderboard", 0, -1, withscores=True)
    return [(player.decode('utf-8'), score) for player, score in entries]

# Usage demonstration
add_tag(123, "python")
add_tag(123, "redis")
print(get_tags(123))

update_leaderboard("Alice", 300)
update_leaderboard("Bob", 450)
print(get_leaderboard())

{'python', 'redis'}
[('Alice', 300.0), ('Bob', 450.0)]


In [None]:
def blocking_consumer(queue_name):
    """
    Continuously listens to the specified queue (Redis list) using BLPOP,
    which blocks until new items are pushed. Once an item arrives,
    it is removed from the queue and processed.
    """
    print(f"Waiting on queue: {queue_name}")
    while True:
        result = r.blpop(queue_name)
        if result:
            list_name, task_bytes = result
            task = task_bytes.decode('utf-8')
            print(f"Received task: {task}")
        else:
            print("Queue is empty or an error occurred.")
            break

def enqueue_task(queue_name, task):
    """
    Pushes a task to the end of a Redis list (queue).
    """
    r.rpush(queue_name, task)

# Example usage:
enqueue_task("blocking_queue", "task_block_1")
enqueue_task("blocking_queue", "task_block_2")

# In a real application, the consumer might run in a separate thread or process
blocking_consumer("blocking_queue")

Waiting on queue: blocking_queue
Received task: task_block_1
Received task: task_block_2


In [16]:
#caching
import requests
import json
import redis

def get_user_data(user_id):
    """
    Retrieves user data from a hypothetical API endpoint.
    If the data is found in Redis (cache), use that. Otherwise, call the API,
    store the response in Redis with a 60-second expiration, and return it.
    """
    cache_key = f"user_data:{user_id}"
    cached_data = r.get(cache_key)
    if cached_data:
        print("Cache hit!")
        return json.loads(cached_data)

    print("Cache miss. Fetching from API...")
    response = requests.get(f"https://api.example.com/users/{user_id}")
    user_info = response.json()

    # Store in Redis for 60 seconds
    r.setex(cache_key, 60, json.dumps(user_info))
    return user_info

# Usage
user = get_user_data(42)  # First call => cache miss
user_again = get_user_data(42)  # Subsequent call => cache hit

Cache miss. Fetching from API...


ConnectionError: HTTPSConnectionPool(host='api.example.com', port=443): Max retries exceeded with url: /users/42 (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x0000021AE4A06300>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed'))

In [17]:
def store_session_with_expiry(user_id, token, ttl=3600):
    """
    Stores a session token for a specific user with a time-to-live (TTL).
    By default, the session expires after 1 hour (3600 seconds).
    """
    session_key = f"user:{user_id}:session"
    r.setex(session_key, ttl, token)

def get_session_with_expiry(user_id):
    """
    Retrieves the session token for the user. Returns None if the key doesn't exist
    or if it has expired.
    """
    session_key = f"user:{user_id}:session"
    token = r.get(session_key)
    return token.decode('utf-8') if token else None

# Usage
store_session_with_expiry(2001, "session_token_abc", 3600)
retrieved_token = get_session_with_expiry(2001)
print(f"Retrieved token: {retrieved_token}")

Retrieved token: session_token_abc


In [19]:
#https://www.datacamp.com/tutorial/python-redis-beginner-guide
#https://realpython.com/python-redis/